Skip to content

Commit bdd2fd8

Browse files
committed
Merge branch 'feat_1.10_4.2.x_kafkaSampling' into '1.10_test_4.2.x_0426'
[feat-1444][kafka] kafka 增加 offsetEnd 参数 See merge request dt-insight-engine/flinkStreamSQL!270
2 parents d57f245 + 6b44aa3 commit bdd2fd8

13 files changed

Lines changed: 265 additions & 8 deletions

File tree

docs/plugin/kafkaSource.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ CREATE TABLE tableName(
4444
|topic | 需要读取的 topic 名称|||
4545
|topicIsPattern | topic是否是正则表达式格式(true|false) |否| false
4646
|offsetReset | 读取的topic 的offset初始位置[latest|earliest|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
47+
|offsetEnd | 任务停止时的offset位置|||
4748
|parallelism | 并行度设置||1|
4849
|sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式||dt_nest|
4950
|schemaInfo | avro类型使用的schema信息|||
@@ -104,6 +105,23 @@ CREATE TABLE MyTable(
104105
parallelism ='1',
105106
sourcedatatype ='json' #可不设置
106107
);
108+
109+
CREATE TABLE two
110+
(
111+
id int,
112+
name string,
113+
message string
114+
) WITH (
115+
type = 'kafka11',
116+
bootstrapServers = 'kudu1:9092,kudu2:9092,kudu3:9092',
117+
zookeeperQuorum = 'kudu1:2181,kudu2:2181,kudu3:2181/kafka',
118+
offsetReset = '{"0": 0,"1": 0,"2":0}',
119+
-- offsetReset = '{"0": 34689}',
120+
-- offsetReset = 'earliest',
121+
offsetEnd = '{"0": 100,"1": 100,"2":100}',
122+
-- offsetEnd = '{"0": 34789}',
123+
topic = 'kafka'
124+
);
107125
```
108126
## 6.支持嵌套json、数据类型字段解析
109127

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka;
20+
21+
import org.apache.flink.api.common.serialization.DeserializationSchema;
22+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
23+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
24+
import org.apache.kafka.clients.consumer.ConsumerRecord;
25+
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
/**
31+
* @author tiezhu
32+
* @date 2021/4/26
33+
* Company dtstack
34+
*/
35+
public class DtKafkaDeserializationSchemaWrapper<T> extends KafkaDeserializationSchemaWrapper<T> {
36+
37+
private final Map<KafkaTopicPartition, Long> specificEndOffsets;
38+
39+
private final List<Integer> endPartition = new ArrayList<>();
40+
41+
public DtKafkaDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema,
42+
Map<KafkaTopicPartition, Long> specificEndOffsets) {
43+
44+
super(deserializationSchema);
45+
this.specificEndOffsets = specificEndOffsets;
46+
}
47+
48+
@Override
49+
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
50+
KafkaTopicPartition topicPartition = new KafkaTopicPartition(record.topic(), record.partition());
51+
if (endPartition.contains(record.partition())) {
52+
return null;
53+
}
54+
Long endOffset = specificEndOffsets.get(topicPartition);
55+
if (record.offset() >= endOffset) {
56+
endPartition.add(record.partition());
57+
return null;
58+
}
59+
return super.deserialize(record);
60+
}
61+
62+
public boolean isEndOfStream(T nextElement) {
63+
return super.isEndOfStream(nextElement)
64+
|| endPartition.size() == specificEndOffsets.size();
65+
}
66+
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@
2323
import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset;
2424
import com.dtstack.flink.sql.table.AbstractSourceParser;
2525
import com.dtstack.flink.sql.table.AbstractTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2627
import com.dtstack.flink.sql.util.MathUtil;
28+
import com.dtstack.flink.sql.util.PluginUtil;
29+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
2730

2831
import java.util.Map;
32+
import java.util.Properties;
2933
import java.util.UUID;
3034
import java.util.stream.Collectors;
3135

@@ -52,6 +56,12 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5256
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5357
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5458
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));
59+
60+
String offsetEnd = MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSET_END_KEY.toLowerCase()));
61+
if (DtStringUtil.isJson(offsetEnd)) {
62+
kafkaSourceTableInfo.setSpecificEndOffsets(buildOffsetMap(offsetEnd, kafkaSourceTableInfo.getTopic()));
63+
}
64+
5565
kafkaSourceTableInfo.setCharsetName(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CHARSET_NAME_KEY.toLowerCase(),"UTF-8")));
5666

5767
kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
@@ -73,4 +83,28 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7383

7484
return kafkaSourceTableInfo;
7585
}
86+
87+
/**
88+
* kafka offset,eg.. {"0":12312,"1":12321,"2":12312}
89+
*
90+
* @param offsetJson offset json
91+
* @param topicName kafka topic
92+
* @return offset map
93+
*/
94+
protected Map<KafkaTopicPartition, Long> buildOffsetMap(String offsetJson, String topicName) {
95+
try {
96+
Properties properties = PluginUtil.jsonStrToObject(offsetJson, Properties.class);
97+
Map<String, Object> offsetMap = PluginUtil.objectToMap(properties);
98+
99+
return offsetMap
100+
.entrySet()
101+
.stream()
102+
.collect(Collectors.toMap(
103+
(Map.Entry<String, Object> entry) -> new KafkaTopicPartition(topicName, Integer.parseInt(entry.getKey())),
104+
(Map.Entry<String, Object> entry) -> Long.valueOf(entry.getValue().toString()))
105+
);
106+
} catch (Exception e) {
107+
throw new RuntimeException("not support offsetReset type:" + offsetJson);
108+
}
109+
}
76110
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
2323
import com.google.common.base.Preconditions;
24+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
2425

2526
import java.util.HashMap;
2627
import java.util.Map;
@@ -45,6 +46,8 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
4546

4647
public static final String OFFSETRESET_KEY = "offsetReset";
4748

49+
public static final String OFFSET_END_KEY = "offsetEnd";
50+
4851
public static final String TOPICISPATTERN_KEY = "topicIsPattern";
4952

5053
public static final String SCHEMA_STRING_KEY = "schemaInfo";
@@ -79,6 +82,8 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
7982

8083
private Long timestampOffset;
8184

85+
private Map<KafkaTopicPartition, Long> specificEndOffsets;
86+
8287
public String getBootstrapServers() {
8388
return bootstrapServers;
8489
}
@@ -171,7 +176,15 @@ public void setTimestampOffset(Long timestampOffset) {
171176
this.timestampOffset = timestampOffset;
172177
}
173178

174-
@Override
179+
public Map<KafkaTopicPartition, Long> getSpecificEndOffsets() {
180+
return specificEndOffsets;
181+
}
182+
183+
public void setSpecificEndOffsets(Map<KafkaTopicPartition, Long> specificEndOffsets) {
184+
this.specificEndOffsets = specificEndOffsets;
185+
}
186+
187+
@Override
175188
public boolean check() {
176189
Preconditions.checkNotNull(getType(), "kafka of type is required");
177190
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka;
20+
21+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
22+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
23+
import org.apache.flink.types.Row;
24+
import org.apache.kafka.clients.consumer.ConsumerRecord;
25+
import org.junit.Assert;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
import java.nio.charset.StandardCharsets;
30+
import java.util.ArrayList;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.stream.Collectors;
35+
36+
/**
37+
* @auther tiezhu
38+
* @date 2021/4/27 4:33 下午
39+
*/
40+
public class DtKafkaDeserializationSchemaWrapperTest {
41+
public Map<KafkaTopicPartition, Long> specificEndOffsets;
42+
43+
private DtKafkaDeserializationSchemaWrapper<String> kafkaDeserializationSchemaWrapper;
44+
45+
@Before
46+
public void setUp() {
47+
String topic = "test";
48+
Map<String, Object> valueMap = new HashMap<>();
49+
50+
valueMap.put("1", 10);
51+
52+
specificEndOffsets = valueMap
53+
.entrySet()
54+
.stream()
55+
.collect(Collectors.toMap(
56+
(Map.Entry<String, Object> entry)
57+
-> new KafkaTopicPartition(topic, Integer.parseInt(entry.getKey())),
58+
(Map.Entry<String, Object> entry)
59+
-> Long.valueOf(entry.getValue().toString()))
60+
);
61+
62+
kafkaDeserializationSchemaWrapper = new DtKafkaDeserializationSchemaWrapper<>(new SimpleStringSchema(), specificEndOffsets);
63+
}
64+
65+
@Test
66+
public void testIsEndOfStream() {
67+
String topic = "test";
68+
69+
Assert.assertFalse(kafkaDeserializationSchemaWrapper.isEndOfStream(topic));
70+
}
71+
72+
@Test
73+
public void testDeserialize() throws Exception {
74+
String str = "test";
75+
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
76+
"test",
77+
1,
78+
11,
79+
str.getBytes(StandardCharsets.UTF_8),
80+
str.getBytes(StandardCharsets.UTF_8)
81+
);
82+
83+
String deserialize = kafkaDeserializationSchemaWrapper.deserialize(record);
84+
85+
Assert.assertNull(deserialize);
86+
}
87+
}

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package com.dtstack.flink.sql.source.kafka;
2020

21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
22+
import org.apache.commons.lang3.StringUtils;
2123
import org.apache.flink.api.common.serialization.DeserializationSchema;
2224
import org.apache.flink.api.common.typeinfo.TypeInformation;
2325
import org.apache.flink.metrics.MetricGroup;
@@ -32,9 +34,6 @@
3234
import org.apache.flink.types.Row;
3335
import org.apache.flink.util.SerializedValue;
3436

35-
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
36-
import org.apache.commons.lang3.StringUtils;
37-
3837
import java.util.Arrays;
3938
import java.util.Map;
4039
import java.util.Properties;
@@ -56,6 +55,16 @@ public KafkaConsumer(String topic, DeserializationMetricWrapper deserializationM
5655
this.deserializationMetricWrapper = deserializationMetricWrapper;
5756
}
5857

58+
public KafkaConsumer(String topic,
59+
DeserializationMetricWrapper deserializationMetricWrapper,
60+
Map<KafkaTopicPartition, Long> specificEndOffsets,
61+
Properties props) {
62+
super(Arrays.asList(StringUtils.split(topic, ",")),
63+
new DtKafkaDeserializationSchemaWrapper<>(deserializationMetricWrapper, specificEndOffsets),
64+
props);
65+
this.deserializationMetricWrapper = deserializationMetricWrapper;
66+
}
67+
5968
public KafkaConsumer(Pattern subscriptionPattern, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
6069
super(subscriptionPattern, deserializationMetricWrapper, props);
6170
this.deserializationMetricWrapper = deserializationMetricWrapper;

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo k
4545
kafkaSrc = new KafkaConsumer(Pattern.compile(kafkaSourceTableInfo.getTopic()), deserMetricWrapper, props);
4646
} else {
4747
DeserializationMetricWrapper deserMetricWrapper = createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> subscriptionState.partitionLag(tp, IsolationLevel.READ_UNCOMMITTED));
48-
kafkaSrc = new KafkaConsumer(kafkaSourceTableInfo.getTopic(), deserMetricWrapper, props);
48+
kafkaSrc = new KafkaConsumer(kafkaSourceTableInfo.getTopic(), deserMetricWrapper, kafkaSourceTableInfo.getSpecificEndOffsets(), props);
4949
}
5050
return kafkaSrc;
5151
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer09.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ public KafkaConsumer09(String topic, DeserializationMetricWrapper deserializatio
5757
this.deserializationMetricWrapper = deserializationMetricWrapper;
5858
}
5959

60+
public KafkaConsumer09(String topic,
61+
DeserializationMetricWrapper deserializationMetricWrapper,
62+
Map<KafkaTopicPartition, Long> specificEndOffsets,
63+
Properties props) {
64+
super(Arrays.asList(StringUtils.split(topic, ",")),
65+
new DtKafkaDeserializationSchemaWrapper<>(deserializationMetricWrapper, specificEndOffsets),
66+
props);
67+
this.deserializationMetricWrapper = deserializationMetricWrapper;
68+
}
69+
6070
public KafkaConsumer09(Pattern subscriptionPattern, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
6171
super(subscriptionPattern, deserializationMetricWrapper, props);
6272
this.deserializationMetricWrapper = deserializationMetricWrapper;

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer09Factory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public FlinkKafkaConsumerBase<Row> createKafkaTableSource(KafkaSourceTableInfo k
4444
kafkaSrc = new KafkaConsumer09(Pattern.compile(kafkaSourceTableInfo.getTopic()), deserMetricWrapper, props);
4545
} else {
4646
DeserializationMetricWrapper deserMetricWrapper = createDeserializationMetricWrapper(kafkaSourceTableInfo, typeInformation, (Calculate & Serializable) (subscriptionState, tp) -> 0L);
47-
kafkaSrc = new KafkaConsumer09(kafkaSourceTableInfo.getTopic(), deserMetricWrapper, props);
47+
kafkaSrc = new KafkaConsumer09(kafkaSourceTableInfo.getTopic(), deserMetricWrapper, kafkaSourceTableInfo.getSpecificEndOffsets(), props);
4848
}
4949
return kafkaSrc;
5050
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer010.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ public KafkaConsumer010(String topic, DeserializationMetricWrapper deserializati
5656
this.deserializationMetricWrapper = deserializationMetricWrapper;
5757
}
5858

59+
public KafkaConsumer010(String topic,
60+
DeserializationMetricWrapper deserializationMetricWrapper,
61+
Map<KafkaTopicPartition, Long> specificEndOffsets,
62+
Properties props) {
63+
super(Arrays.asList(StringUtils.split(topic, ",")),
64+
new DtKafkaDeserializationSchemaWrapper<>(deserializationMetricWrapper, specificEndOffsets),
65+
props);
66+
this.deserializationMetricWrapper = deserializationMetricWrapper;
67+
}
68+
5969
public KafkaConsumer010(Pattern subscriptionPattern, DeserializationMetricWrapper deserializationMetricWrapper, Properties props) {
6070
super(subscriptionPattern, deserializationMetricWrapper, props);
6171
this.deserializationMetricWrapper = deserializationMetricWrapper;

0 commit comments

Comments
 (0)