Skip to content

Commit b5c994a

Browse files
author
sishu@dtstack.com
committed
Merge branch 'v1.5.0_release'
ysq
2 parents f223993 + c9ecc83 commit b5c994a

18 files changed

Lines changed: 361 additions & 230 deletions

File tree

docs/kafkaSource.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ CREATE TABLE tableName(
3838
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
3939
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
4040
|topic | 需要读取的 topic 名称|||
41+
|topicIsPattern | topic是否是正则表达式格式|否| false
42+
|groupId | 需要读取的 groupId 名称|||
4143
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest\|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4244
|parallelism | 并行度设置||1|
4345

@@ -54,7 +56,10 @@ CREATE TABLE MyTable(
5456
bootstrapServers ='172.16.8.198:9092',
5557
zookeeperQuorum ='172.16.8.198:2181/kafka',
5658
offsetReset ='latest',
57-
topic ='nbTest1',
59+
groupId='nbTest',
60+
topic ='nbTest1,nbTest2,nbTest3',
61+
--- topic ='mqTest.*',
62+
---topicIsPattern='true',
5863
parallelism ='1'
5964
);
6065
```

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerKafka09Consumer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import org.apache.flink.types.Row;
3131
import org.apache.flink.util.SerializedValue;
3232

33+
import java.util.Arrays;
3334
import java.util.Map;
3435
import java.util.Properties;
36+
import java.util.regex.Pattern;
3537

3638
/**
3739
* Reason:
@@ -47,7 +49,11 @@ public class CustomerKafka09Consumer extends FlinkKafkaConsumer09<Row> {
4749
private CustomerJsonDeserialization customerJsonDeserialization;
4850

4951
public CustomerKafka09Consumer(String topic, AbsDeserialization valueDeserializer, Properties props) {
50-
super(topic, valueDeserializer, props);
52+
super(Arrays.asList(topic.split(",")), valueDeserializer, props);
53+
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
54+
}
55+
public CustomerKafka09Consumer(Pattern subscriptionPattern, AbsDeserialization<Row> valueDeserializer, Properties props) {
56+
super(subscriptionPattern, valueDeserializer, props);
5157
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
5258
}
5359

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
2626
import com.dtstack.flink.sql.util.DtStringUtil;
2727
import com.dtstack.flink.sql.util.PluginUtil;
28+
import org.apache.commons.lang3.BooleanUtils;
2829
import org.apache.commons.lang3.StringUtils;
2930
import org.apache.flink.api.common.functions.RuntimeContext;
3031
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -39,6 +40,7 @@
3940
import java.util.HashMap;
4041
import java.util.Map;
4142
import java.util.Properties;
43+
import java.util.regex.Pattern;
4244

4345
/**
4446
* If eventtime field is specified, the default time field rowtime
@@ -67,6 +69,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6769
Properties props = new Properties();
6870
props.setProperty("bootstrap.servers", kafka09SourceTableInfo.getBootstrapServers());
6971
props.setProperty("auto.offset.reset", kafka09SourceTableInfo.getOffsetReset());
72+
if (StringUtils.isNotBlank(kafka09SourceTableInfo.getGroupId())){
73+
props.setProperty("group.id", kafka09SourceTableInfo.getGroupId());
74+
}
75+
// only required for Kafka 0.8
7076
//TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.)
7177

7278
TypeInformation[] types = new TypeInformation[kafka09SourceTableInfo.getFields().length];
@@ -75,8 +81,14 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7581
}
7682

7783
TypeInformation<Row> typeInformation = new RowTypeInfo(types, kafka09SourceTableInfo.getFields());
78-
FlinkKafkaConsumer09<Row> kafkaSrc = new CustomerKafka09Consumer(topicName,
79-
new CustomerJsonDeserialization(typeInformation), props);
84+
FlinkKafkaConsumer09<Row> kafkaSrc;
85+
if (BooleanUtils.isTrue(kafka09SourceTableInfo.getTopicIsPattern())) {
86+
kafkaSrc = new CustomerKafka09Consumer(Pattern.compile(topicName),
87+
new CustomerJsonDeserialization(typeInformation), props);
88+
} else {
89+
kafkaSrc = new CustomerKafka09Consumer(topicName,
90+
new CustomerJsonDeserialization(typeInformation), props);
91+
}
8092

8193
//earliest,latest
8294
if("earliest".equalsIgnoreCase(kafka09SourceTableInfo.getOffsetReset())){

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.source.kafka.table;
2222

@@ -46,6 +46,9 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4646
kafka09SourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4747
kafka09SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
4848
kafka09SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
49+
kafka09SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
50+
kafka09SourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
51+
kafka09SourceTableInfo.check();
4952
return kafka09SourceTableInfo;
5053
}
5154
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 77 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.source.kafka.table;
2221

@@ -27,94 +26,111 @@
2726
* Reason:
2827
* Date: 2018/6/22
2928
* Company: www.dtstack.com
29+
*
3030
* @author xuchao
3131
*/
3232

3333
public class KafkaSourceTableInfo extends SourceTableInfo {
3434

35-
//version
36-
private static final String CURR_TYPE = "kafka09";
35+
//version
36+
private static final String CURR_TYPE = "kafka09";
37+
38+
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
39+
40+
public static final String TOPIC_KEY = "topic";
41+
42+
public static final String GROUPID_KEY = "groupId";
43+
44+
public static final String OFFSETRESET_KEY = "offsetReset";
45+
46+
public static final String TOPICISPATTERN_KEY = "topicIsPattern";
47+
48+
private String bootstrapServers;
3749

38-
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
50+
private String topic;
3951

40-
public static final String TOPIC_KEY = "topic";
52+
private String groupId;
4153

42-
public static final String GROUPID_KEY = "groupId";
54+
//latest, earliest
55+
private String offsetReset = "latest";
4356

44-
private String bootstrapServers;
57+
private String offset;
4558

46-
private String topic;
59+
private Boolean topicIsPattern = false;
4760

48-
private String groupId;
61+
public Boolean getTopicIsPattern() {
62+
return topicIsPattern;
63+
}
4964

50-
//latest, earliest
51-
private String offsetReset = "latest";
65+
public void setTopicIsPattern(Boolean topicIsPattern) {
66+
if (topicIsPattern == null) return;
5267

53-
private String offset;
68+
this.topicIsPattern = topicIsPattern;
69+
}
5470

55-
public KafkaSourceTableInfo(){
56-
super.setType(CURR_TYPE);
57-
}
71+
public KafkaSourceTableInfo() {
72+
super.setType(CURR_TYPE);
73+
}
5874

5975

60-
public String getBootstrapServers() {
61-
return bootstrapServers;
62-
}
76+
public String getBootstrapServers() {
77+
return bootstrapServers;
78+
}
6379

64-
public void setBootstrapServers(String bootstrapServers) {
65-
this.bootstrapServers = bootstrapServers;
66-
}
80+
public void setBootstrapServers(String bootstrapServers) {
81+
this.bootstrapServers = bootstrapServers;
82+
}
6783

68-
public String getTopic() {
69-
return topic;
70-
}
84+
public String getTopic() {
85+
return topic;
86+
}
7187

72-
public void setTopic(String topic) {
73-
this.topic = topic;
74-
}
88+
public void setTopic(String topic) {
89+
this.topic = topic;
90+
}
7591

76-
public String getGroupId() {
77-
return groupId;
78-
}
92+
public String getGroupId() {
93+
return groupId;
94+
}
7995

80-
public void setGroupId(String groupId) {
81-
this.groupId = groupId;
82-
}
96+
public void setGroupId(String groupId) {
97+
this.groupId = groupId;
98+
}
8399

84-
public String getOffsetReset() {
85-
return offsetReset;
86-
}
100+
public String getOffsetReset() {
101+
return offsetReset;
102+
}
87103

88-
public void setOffsetReset(String offsetReset) {
89-
if(offsetReset == null){
90-
return;
91-
}
104+
public void setOffsetReset(String offsetReset) {
105+
if (offsetReset == null) {
106+
return;
107+
}
92108

93-
this.offsetReset = offsetReset;
94-
}
109+
this.offsetReset = offsetReset;
110+
}
95111

96-
public String getOffset() {
97-
return offset;
98-
}
112+
public String getOffset() {
113+
return offset;
114+
}
99115

100-
public void setOffset(String offset) {
101-
this.offset = offset;
102-
}
116+
public void setOffset(String offset) {
117+
this.offset = offset;
118+
}
103119

104-
@Override
105-
public boolean check() {
106-
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
107-
Preconditions.checkNotNull(topic, "kafka of topic is required");
108-
Preconditions.checkNotNull(groupId, "kafka of groupId is required");
109-
Preconditions.checkState(offsetReset.equalsIgnoreCase("latest")
110-
|| offsetReset.equalsIgnoreCase("latest"), "kafka of offsetReset set fail");
120+
@Override
121+
public boolean check() {
122+
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
123+
Preconditions.checkNotNull(topic, "kafka of topic is required");
124+
//Preconditions.checkNotNull(groupId, "kafka of groupId is required");
125+
Preconditions.checkState(offsetReset.equalsIgnoreCase("latest")
126+
|| offsetReset.equalsIgnoreCase("latest"), "kafka of offsetReset set fail");
111127

112-
return false;
113-
}
128+
return false;
129+
}
114130

115-
@Override
116-
public String getType() {
131+
@Override
132+
public String getType() {
117133
// return super.getType() + SOURCE_SUFFIX;
118-
return super.getType();
119-
}
134+
return super.getType();
135+
}
120136
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerKafka010Consumer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import org.apache.flink.types.Row;
3131
import org.apache.flink.util.SerializedValue;
3232

33+
import java.util.Arrays;
3334
import java.util.Map;
3435
import java.util.Properties;
36+
import java.util.regex.Pattern;
3537

3638
/**
3739
* Reason:
@@ -47,10 +49,14 @@ public class CustomerKafka010Consumer extends FlinkKafkaConsumer010<Row> {
4749
private CustomerJsonDeserialization customerJsonDeserialization;
4850

4951
public CustomerKafka010Consumer(String topic, AbsDeserialization valueDeserializer, Properties props) {
50-
super(topic, valueDeserializer, props);
52+
super(Arrays.asList(topic.split(",")), valueDeserializer, props);
5153
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
5254
}
5355

56+
public CustomerKafka010Consumer(Pattern subscriptionPattern, AbsDeserialization<Row> valueDeserializer, Properties props) {
57+
super(subscriptionPattern, valueDeserializer, props);
58+
this.customerJsonDeserialization = (CustomerJsonDeserialization) valueDeserializer;
59+
}
5460
@Override
5561
public void run(SourceContext<Row> sourceContext) throws Exception {
5662
customerJsonDeserialization.setRuntimeContext(getRuntimeContext());

0 commit comments

Comments
 (0)