Skip to content

Commit 277bedc

Browse files
zhihui-ge戈志辉
authored andcommitted
update kafka source
1 parent bd3b789 commit 277bedc

4 files changed

Lines changed: 54 additions & 3 deletions

File tree

docs/kafkaSource.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ CREATE TABLE tableName(
3838
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
3939
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
4040
|topic | 需要读取的 topic 名称|||
41-
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest]||latest|
41+
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest\|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4242
|parallelism | 并行度设置||1|
4343

4444
## 5.样例:

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,20 @@
2323
import com.dtstack.flink.sql.source.IStreamSourceGener;
2424
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
26+
import com.dtstack.flink.sql.util.PluginUtil;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.common.functions.RuntimeContext;
2829
import org.apache.flink.api.common.typeinfo.TypeInformation;
2930
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3031
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3132
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
33+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3234
import org.apache.flink.table.api.Table;
3335
import org.apache.flink.table.api.java.StreamTableEnvironment;
3436
import org.apache.flink.types.Row;
3537

38+
import java.util.HashMap;
39+
import java.util.Map;
3640
import java.util.Properties;
3741

3842
/**
@@ -76,7 +80,20 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7680
//earliest,latest
7781
if("earliest".equalsIgnoreCase(kafka09SourceTableInfo.getOffsetReset())){
7882
kafkaSrc.setStartFromEarliest();
79-
}else{
83+
}else if(kafka09SourceTableInfo.getOffsetReset().startsWith("{")){
84+
try {
85+
// {"0":12312,"1":12321,"2":12312}
86+
Properties properties = PluginUtil.jsonStrToObject(kafka09SourceTableInfo.getOffsetReset(), Properties.class);
87+
Map<String, Object> offsetMap = PluginUtil.ObjectToMap(properties);
88+
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
89+
for(Map.Entry<String,Object> entry:offsetMap.entrySet()){
90+
specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString()));
91+
}
92+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
93+
} catch (Exception e) {
94+
throw new RuntimeException("not support offsetReset type:" + kafka09SourceTableInfo.getOffsetReset());
95+
}
96+
}else {
8097
kafkaSrc.setStartFromLatest();
8198
}
8299

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@
2323
import com.dtstack.flink.sql.source.IStreamSourceGener;
2424
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
26+
import com.dtstack.flink.sql.util.PluginUtil;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.common.typeinfo.TypeInformation;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3031
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
32+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3133
import org.apache.flink.table.api.Table;
3234
import org.apache.flink.table.api.java.StreamTableEnvironment;
3335
import org.apache.flink.types.Row;
3436

37+
import java.util.HashMap;
38+
import java.util.Map;
3539
import java.util.Properties;
3640

3741
/**
@@ -75,7 +79,20 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7579
//earliest,latest
7680
if("earliest".equalsIgnoreCase(kafka010SourceTableInfo.getOffsetReset())){
7781
kafkaSrc.setStartFromEarliest();
78-
}else{
82+
}else if(kafka010SourceTableInfo.getOffsetReset().startsWith("{")){
83+
try {
84+
// {"0":12312,"1":12321,"2":12312}
85+
Properties properties = PluginUtil.jsonStrToObject(kafka010SourceTableInfo.getOffsetReset(), Properties.class);
86+
Map<String, Object> offsetMap = PluginUtil.ObjectToMap(properties);
87+
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
88+
for(Map.Entry<String,Object> entry:offsetMap.entrySet()){
89+
specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString()));
90+
}
91+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
92+
} catch (Exception e) {
93+
throw new RuntimeException("not support offsetReset type:" + kafka010SourceTableInfo.getOffsetReset());
94+
}
95+
}else {
7996
kafkaSrc.setStartFromLatest();
8097
}
8198

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@
2323
import com.dtstack.flink.sql.source.IStreamSourceGener;
2424
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
26+
import com.dtstack.flink.sql.util.PluginUtil;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.common.typeinfo.TypeInformation;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3031
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
32+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3133
import org.apache.flink.table.api.Table;
3234
import org.apache.flink.table.api.java.StreamTableEnvironment;
3335
import org.apache.flink.types.Row;
3436

37+
import java.util.HashMap;
38+
import java.util.Map;
3539
import java.util.Properties;
3640

3741
/**
@@ -75,6 +79,19 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7579
//earliest,latest
7680
if("earliest".equalsIgnoreCase(kafka011SourceTableInfo.getOffsetReset())){
7781
kafkaSrc.setStartFromEarliest();
82+
}else if(kafka011SourceTableInfo.getOffsetReset().startsWith("{")){
83+
try {
84+
// {"0":12312,"1":12321,"2":12312}
85+
Properties properties = PluginUtil.jsonStrToObject(kafka011SourceTableInfo.getOffsetReset(), Properties.class);
86+
Map<String, Object> offsetMap = PluginUtil.ObjectToMap(properties);
87+
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
88+
for(Map.Entry<String,Object> entry:offsetMap.entrySet()){
89+
specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString()));
90+
}
91+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
92+
} catch (Exception e) {
93+
throw new RuntimeException("not support offsetReset type:" + kafka011SourceTableInfo.getOffsetReset());
94+
}
7895
}else{
7996
kafkaSrc.setStartFromLatest();
8097
}

0 commit comments

Comments
 (0)