Skip to content

Commit d783df6

Browse files
committed
groupid check null
1 parent 72a90a9 commit d783df6

4 files changed

Lines changed: 11 additions & 5 deletions

File tree

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka09SourceTableInfo.getBootstrapServers());
7171
props.setProperty("auto.offset.reset", kafka09SourceTableInfo.getOffsetReset());
72-
props.setProperty("group.id", kafka09SourceTableInfo.getGroupId());
72+
if (StringUtils.isNotBlank(kafka09SourceTableInfo.getGroupId())){
73+
props.setProperty("group.id", kafka09SourceTableInfo.getGroupId());
74+
}
7375
// only required for Kafka 0.8
7476
//TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.)
7577

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka010SourceTableInfo.getBootstrapServers());
7171
props.setProperty("auto.offset.reset", kafka010SourceTableInfo.getOffsetReset());
72-
props.setProperty("group.id", kafka010SourceTableInfo.getGroupId());
72+
if (StringUtils.isNotBlank(kafka010SourceTableInfo.getGroupId())){
73+
props.setProperty("group.id", kafka010SourceTableInfo.getGroupId());
74+
}
7375
// only required for Kafka 0.8
7476
//TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.)
7577

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka011SourceTableInfo.getBootstrapServers());
7171
props.setProperty("auto.offset.reset", kafka011SourceTableInfo.getOffsetReset());
72-
props.setProperty("group.id", kafka011SourceTableInfo.getGroupId());
72+
if (StringUtils.isNotBlank(kafka011SourceTableInfo.getGroupId())){
73+
props.setProperty("group.id", kafka011SourceTableInfo.getGroupId());
74+
}
7375
// only required for Kafka 0.8
7476
//TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.)
7577

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void fillRealIndexes() throws SQLException {
7070
}
7171
String column_name = rs.getString("COLUMN_NAME");
7272
if (StringUtils.isNotBlank(column_name)) {
73-
column_name = column_name.toUpperCase();
73+
column_name = column_name;
7474
}
7575
map.get(indexName).add(column_name);
7676
}
@@ -94,7 +94,7 @@ public void fillFullColumns() throws SQLException {
9494
while (rs.next()) {
9595
String columnName = rs.getString("COLUMN_NAME");
9696
if (StringUtils.isNotBlank(columnName)) {
97-
getFullField().add(columnName.toUpperCase());
97+
getFullField().add(columnName);
9898
}
9999
}
100100
}

0 commit comments

Comments
 (0)