Skip to content

Commit 21f3020

Browse files
author
yanxi0227
committed
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v1.5.0_dev
2 parents ab515e7 + c9ecc83 commit 21f3020

7 files changed

Lines changed: 13 additions & 7 deletions

File tree

docs/kafkaSource.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ CREATE TABLE tableName(
3939
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
4040
|topic | 需要读取的 topic 名称|||
4141
|topicIsPattern | topic是否是正则表达式格式|否| false
42-
|groupId | 需要读取的 groupId 名称|||
42+
|groupId | 需要读取的 groupId 名称|||
4343
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest\|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4444
|parallelism | 并行度设置||1|
4545

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka09SourceTableInfo.getBootstrapServers());
7171
props.setProperty("auto.offset.reset", kafka09SourceTableInfo.getOffsetReset());
72-
props.setProperty("group.id", kafka09SourceTableInfo.getGroupId());
72+
if (StringUtils.isNotBlank(kafka09SourceTableInfo.getGroupId())){
73+
props.setProperty("group.id", kafka09SourceTableInfo.getGroupId());
74+
}
7375
// only required for Kafka 0.8
7476
//TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.)
7577

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void setOffset(String offset) {
121121
public boolean check() {
122122
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
123123
Preconditions.checkNotNull(topic, "kafka of topic is required");
124-
Preconditions.checkNotNull(groupId, "kafka of groupId is required");
124+
//Preconditions.checkNotNull(groupId, "kafka of groupId is required");
125125
Preconditions.checkState(offsetReset.equalsIgnoreCase("latest")
126126
|| offsetReset.equalsIgnoreCase("latest"), "kafka of offsetReset set fail");
127127

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka010SourceTableInfo.getBootstrapServers());
7171
props.setProperty("auto.offset.reset", kafka010SourceTableInfo.getOffsetReset());
72-
props.setProperty("group.id", kafka010SourceTableInfo.getGroupId());
72+
if (StringUtils.isNotBlank(kafka010SourceTableInfo.getGroupId())){
73+
props.setProperty("group.id", kafka010SourceTableInfo.getGroupId());
74+
}
7375
// only required for Kafka 0.8
7476
//TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.)
7577

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void setTopicIsPattern(Boolean topicIsPattern) {
121121
public boolean check() {
122122
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
123123
Preconditions.checkNotNull(topic, "kafka of topic is required");
124-
Preconditions.checkNotNull(groupId, "kafka of groupId is required");
124+
//Preconditions.checkNotNull(groupId, "kafka of groupId is required");
125125
Preconditions.checkState(offsetReset.equalsIgnoreCase("latest")
126126
|| offsetReset.equalsIgnoreCase("latest"), "kafka of offsetReset set fail");
127127

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka011SourceTableInfo.getBootstrapServers());
7171
props.setProperty("auto.offset.reset", kafka011SourceTableInfo.getOffsetReset());
72-
props.setProperty("group.id", kafka011SourceTableInfo.getGroupId());
72+
if (StringUtils.isNotBlank(kafka011SourceTableInfo.getGroupId())){
73+
props.setProperty("group.id", kafka011SourceTableInfo.getGroupId());
74+
}
7375
// only required for Kafka 0.8
7476
//TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.)
7577

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void setOffset(String offset) {
122122
public boolean check() {
123123
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
124124
Preconditions.checkNotNull(topic, "kafka of topic is required");
125-
Preconditions.checkNotNull(groupId, "kafka of groupId is required");
125+
//Preconditions.checkNotNull(groupId, "kafka of groupId is required");
126126
Preconditions.checkState(offsetReset.equalsIgnoreCase("latest")
127127
|| offsetReset.equalsIgnoreCase("latest"), "kafka of offsetReset set fail");
128128

0 commit comments

Comments
 (0)