Skip to content

Commit a635352

Browse files
committed
add pattern match for kafka topic and fix groupId invalid, check method not called bug
1 parent f8d6923 commit a635352

15 files changed

Lines changed: 337 additions & 226 deletions

File tree

docs/kafkaSource.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ CREATE TABLE tableName(
3838
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
3939
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
4040
|topic | 需要读取的 topic 名称|||
41+
|topicIsPattern | topic是否是正则表达式格式|否| false
42+
|groupId | 需要读取的 groupId 名称|||
4143
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest\|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4244
|parallelism | 并行度设置||1|
4345

@@ -54,7 +56,10 @@ CREATE TABLE MyTable(
5456
bootstrapServers ='172.16.8.198:9092',
5557
zookeeperQuorum ='172.16.8.198:2181/kafka',
5658
offsetReset ='latest',
57-
topic ='nbTest1',
59+
groupId='nbTest',
60+
topic ='nbTest1,nbTest2,nbTest3',
61+
--- topic ='mqTest.*',
62+
---topicIsPattern='true',
5863
parallelism ='1'
5964
);
6065
```

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerKafka09Consumer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import org.apache.flink.types.Row;
3131
import org.apache.flink.util.SerializedValue;
3232

33+
import java.util.Arrays;
3334
import java.util.Map;
3435
import java.util.Properties;
36+
import java.util.regex.Pattern;
3537

3638
/**
3739
* Reason:
@@ -47,7 +49,11 @@ public class CustomerKafka09Consumer extends FlinkKafkaConsumer09<Row> {
4749
private CustomerJsonDeserialization customerJsonDeserialization;
4850

4951
public CustomerKafka09Consumer(String topic, AbsDeserialization valueDeserializer, Properties props) {
50-
super(topic, valueDeserializer, props);
52+
super(Arrays.asList(topic.split(",")), valueDeserializer, props);
53+
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
54+
}
55+
public CustomerKafka09Consumer(Pattern subscriptionPattern, AbsDeserialization<Row> valueDeserializer, Properties props) {
56+
super(subscriptionPattern, valueDeserializer, props);
5157
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
5258
}
5359

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
2626
import com.dtstack.flink.sql.util.DtStringUtil;
2727
import com.dtstack.flink.sql.util.PluginUtil;
28+
import org.apache.commons.lang3.BooleanUtils;
2829
import org.apache.commons.lang3.StringUtils;
2930
import org.apache.flink.api.common.functions.RuntimeContext;
3031
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -39,6 +40,7 @@
3940
import java.util.HashMap;
4041
import java.util.Map;
4142
import java.util.Properties;
43+
import java.util.regex.Pattern;
4244

4345
/**
4446
* If eventtime field is specified, the default time field rowtime
@@ -67,6 +69,8 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6769
Properties props = new Properties();
6870
props.setProperty("bootstrap.servers", kafka09SourceTableInfo.getBootstrapServers());
6971
props.setProperty("auto.offset.reset", kafka09SourceTableInfo.getOffsetReset());
72+
props.setProperty("group.id", kafka09SourceTableInfo.getGroupId());
73+
// only required for Kafka 0.8
7074
//TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.)
7175

7276
TypeInformation[] types = new TypeInformation[kafka09SourceTableInfo.getFields().length];
@@ -75,8 +79,14 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7579
}
7680

7781
TypeInformation<Row> typeInformation = new RowTypeInfo(types, kafka09SourceTableInfo.getFields());
78-
FlinkKafkaConsumer09<Row> kafkaSrc = new CustomerKafka09Consumer(topicName,
79-
new CustomerJsonDeserialization(typeInformation), props);
82+
FlinkKafkaConsumer09<Row> kafkaSrc;
83+
if (BooleanUtils.isTrue(kafka09SourceTableInfo.getTopicIsPattern())) {
84+
kafkaSrc = new CustomerKafka09Consumer(Pattern.compile(topicName),
85+
new CustomerJsonDeserialization(typeInformation), props);
86+
} else {
87+
kafkaSrc = new CustomerKafka09Consumer(topicName,
88+
new CustomerJsonDeserialization(typeInformation), props);
89+
}
8090

8191
//earliest,latest
8292
if("earliest".equalsIgnoreCase(kafka09SourceTableInfo.getOffsetReset())){

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4747
kafka09SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
4848
kafka09SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
4949
kafka09SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
50+
kafka09SourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
51+
kafka09SourceTableInfo.check();
5052
return kafka09SourceTableInfo;
5153
}
5254
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 76 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.source.kafka.table;
2221

@@ -27,96 +26,111 @@
2726
* Reason:
2827
* Date: 2018/6/22
2928
* Company: www.dtstack.com
29+
*
3030
* @author xuchao
3131
*/
3232

3333
public class KafkaSourceTableInfo extends SourceTableInfo {
3434

35-
//version
36-
private static final String CURR_TYPE = "kafka09";
35+
//version
36+
private static final String CURR_TYPE = "kafka09";
37+
38+
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
39+
40+
public static final String TOPIC_KEY = "topic";
41+
42+
public static final String GROUPID_KEY = "groupId";
43+
44+
public static final String OFFSETRESET_KEY = "offsetReset";
45+
46+
public static final String TOPICISPATTERN_KEY = "topicIsPattern";
3747

38-
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
48+
private String bootstrapServers;
3949

40-
public static final String TOPIC_KEY = "topic";
50+
private String topic;
4151

42-
public static final String GROUPID_KEY = "groupId";
52+
private String groupId;
4353

44-
public static final String OFFSETRESET_KEY="offsetReset";
54+
//latest, earliest
55+
private String offsetReset = "latest";
4556

46-
private String bootstrapServers;
57+
private String offset;
4758

48-
private String topic;
59+
private Boolean topicIsPattern = false;
4960

50-
private String groupId;
61+
public Boolean getTopicIsPattern() {
62+
return topicIsPattern;
63+
}
5164

52-
//latest, earliest
53-
private String offsetReset = "latest";
65+
public void setTopicIsPattern(Boolean topicIsPattern) {
66+
if (topicIsPattern == null) return;
5467

55-
private String offset;
68+
this.topicIsPattern = topicIsPattern;
69+
}
5670

57-
public KafkaSourceTableInfo(){
58-
super.setType(CURR_TYPE);
59-
}
71+
public KafkaSourceTableInfo() {
72+
super.setType(CURR_TYPE);
73+
}
6074

6175

62-
public String getBootstrapServers() {
63-
return bootstrapServers;
64-
}
76+
public String getBootstrapServers() {
77+
return bootstrapServers;
78+
}
6579

66-
public void setBootstrapServers(String bootstrapServers) {
67-
this.bootstrapServers = bootstrapServers;
68-
}
80+
public void setBootstrapServers(String bootstrapServers) {
81+
this.bootstrapServers = bootstrapServers;
82+
}
6983

70-
public String getTopic() {
71-
return topic;
72-
}
84+
public String getTopic() {
85+
return topic;
86+
}
7387

74-
public void setTopic(String topic) {
75-
this.topic = topic;
76-
}
88+
public void setTopic(String topic) {
89+
this.topic = topic;
90+
}
7791

78-
public String getGroupId() {
79-
return groupId;
80-
}
92+
public String getGroupId() {
93+
return groupId;
94+
}
8195

82-
public void setGroupId(String groupId) {
83-
this.groupId = groupId;
84-
}
96+
public void setGroupId(String groupId) {
97+
this.groupId = groupId;
98+
}
8599

86-
public String getOffsetReset() {
87-
return offsetReset;
88-
}
100+
public String getOffsetReset() {
101+
return offsetReset;
102+
}
89103

90-
public void setOffsetReset(String offsetReset) {
91-
if(offsetReset == null){
92-
return;
93-
}
104+
public void setOffsetReset(String offsetReset) {
105+
if (offsetReset == null) {
106+
return;
107+
}
94108

95-
this.offsetReset = offsetReset;
96-
}
109+
this.offsetReset = offsetReset;
110+
}
97111

98-
public String getOffset() {
99-
return offset;
100-
}
112+
public String getOffset() {
113+
return offset;
114+
}
101115

102-
public void setOffset(String offset) {
103-
this.offset = offset;
104-
}
116+
public void setOffset(String offset) {
117+
this.offset = offset;
118+
}
105119

106-
@Override
107-
public boolean check() {
108-
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
109-
Preconditions.checkNotNull(topic, "kafka of topic is required");
110-
Preconditions.checkNotNull(groupId, "kafka of groupId is required");
111-
Preconditions.checkState(offsetReset.equalsIgnoreCase("latest")
112-
|| offsetReset.equalsIgnoreCase("latest"), "kafka of offsetReset set fail");
120+
@Override
121+
public boolean check() {
122+
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
123+
Preconditions.checkNotNull(topic, "kafka of topic is required");
124+
Preconditions.checkNotNull(groupId, "kafka of groupId is required");
125+
Preconditions.checkState(offsetReset.equalsIgnoreCase("latest")
126+
|| offsetReset.equalsIgnoreCase("latest"), "kafka of offsetReset set fail");
113127

114-
return false;
115-
}
128+
return false;
129+
}
116130

117-
@Override
118-
public String getType() {
131+
@Override
132+
public String getType() {
119133
// return super.getType() + SOURCE_SUFFIX;
120-
return super.getType();
121-
}
134+
return super.getType();
135+
}
122136
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerKafka010Consumer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import org.apache.flink.types.Row;
3131
import org.apache.flink.util.SerializedValue;
3232

33+
import java.util.Arrays;
3334
import java.util.Map;
3435
import java.util.Properties;
36+
import java.util.regex.Pattern;
3537

3638
/**
3739
* Reason:
@@ -47,10 +49,14 @@ public class CustomerKafka010Consumer extends FlinkKafkaConsumer010<Row> {
4749
private CustomerJsonDeserialization customerJsonDeserialization;
4850

4951
public CustomerKafka010Consumer(String topic, AbsDeserialization valueDeserializer, Properties props) {
50-
super(topic, valueDeserializer, props);
52+
super(Arrays.asList(topic.split(",")), valueDeserializer, props);
5153
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
5254
}
5355

56+
public CustomerKafka010Consumer(Pattern subscriptionPattern, AbsDeserialization<Row> valueDeserializer, Properties props) {
57+
super(subscriptionPattern, valueDeserializer, props);
58+
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
59+
}
5460
@Override
5561
public void run(SourceContext<Row> sourceContext) throws Exception {
5662
customerJsonDeserialization.setRuntimeContext(getRuntimeContext());

0 commit comments

Comments
 (0)