Skip to content

Commit 20635f0

Browse files
committed
解决kafka topic不设置报NPE
1 parent e463694 commit 20635f0

2 files changed

Lines changed: 12 additions & 2 deletions

File tree

core/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,18 @@
122122
<artifactId>junit</artifactId>
123123
<version>4.12</version>
124124
</dependency>
125+
<dependency>
126+
<groupId>log4j</groupId>
127+
<artifactId>log4j</artifactId>
128+
<version>1.2.17</version>
129+
</dependency>
130+
131+
<dependency>
132+
<groupId>org.slf4j</groupId>
133+
<artifactId>slf4j-simple</artifactId>
134+
<version>1.7.30</version>
135+
</dependency>
136+
125137
</dependencies>
126138

127139
<build>

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,8 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4848
kafkaSourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4949
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
5050
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
51-
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
5251
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5352
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));
54-
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
5553
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5654

5755
kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));

0 commit comments

Comments
 (0)