Skip to content

Commit a75a0b7

Browse files
committed
fix bug for kafka offset
1 parent a363415 commit a75a0b7

6 files changed

Lines changed: 10 additions & 2 deletions

File tree

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.source.kafka.table;
2222

@@ -46,6 +46,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4646
kafka09SourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4747
kafka09SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
4848
kafka09SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
49+
kafka09SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
4950
return kafka09SourceTableInfo;
5051
}
5152
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class KafkaSourceTableInfo extends SourceTableInfo {
4141

4242
public static final String GROUPID_KEY = "groupId";
4343

44+
public static final String OFFSETRESET_KEY="offsetReset";
45+
4446
private String bootstrapServers;
4547

4648
private String topic;

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4646
kafka10SourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4747
kafka10SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
4848
kafka10SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
49+
kafka10SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
4950
return kafka10SourceTableInfo;
5051
}
5152
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class KafkaSourceTableInfo extends SourceTableInfo {
4141

4242
public static final String GROUPID_KEY = "groupId";
4343

44+
public static final String OFFSETRESET_KEY="offsetReset";
45+
4446
private String bootstrapServers;
4547

4648
private String topic;
@@ -89,7 +91,6 @@ public void setOffsetReset(String offsetReset) {
8991
if(offsetReset == null){
9092
return;
9193
}
92-
9394
this.offsetReset = offsetReset;
9495
}
9596

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4646
kafka11SourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4747
kafka11SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
4848
kafka11SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
49+
kafka11SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
4950
return kafka11SourceTableInfo;
5051
}
5152
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class KafkaSourceTableInfo extends SourceTableInfo {
4141

4242
public static final String GROUPID_KEY = "groupId";
4343

44+
public static final String OFFSETRESET_KEY="offsetReset";
45+
4446
private String bootstrapServers;
4547

4648
private String topic;

0 commit comments

Comments
 (0)