Skip to content

Commit 63a804f

Browse files
committed
[feat-144][kafka] 修复没有 offsetEnd 参数时导致的空指针异常
1 parent dc2873b commit 63a804f

1 file changed

Lines changed: 11 additions & 6 deletions

File tree

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/DtKafkaDeserializationSchemaWrapper.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,21 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
5151
if (endPartition.contains(record.partition())) {
5252
return null;
5353
}
54-
Long endOffset = specificEndOffsets.get(topicPartition);
55-
if (record.offset() >= endOffset) {
56-
endPartition.add(record.partition());
57-
return null;
54+
if (specificEndOffsets != null) {
55+
Long endOffset = specificEndOffsets.get(topicPartition);
56+
if (record.offset() >= endOffset) {
57+
endPartition.add(record.partition());
58+
return null;
59+
}
5860
}
61+
5962
return super.deserialize(record);
6063
}
6164

6265
public boolean isEndOfStream(T nextElement) {
63-
return super.isEndOfStream(nextElement)
64-
|| endPartition.size() == specificEndOffsets.size();
66+
boolean isEnd =
67+
specificEndOffsets != null
68+
&& endPartition.size() == specificEndOffsets.size();
69+
return super.isEndOfStream(nextElement) || isEnd;
6570
}
6671
}

0 commit comments

Comments
 (0)