Skip to content

Commit 03f1f67

Browse files
committed
fix kafka row to table error
1 parent 2d86ee2 commit 03f1f67

3 files changed

Lines changed: 18 additions & 2 deletions

File tree

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,4 +277,9 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
277277
}
278278
}
279279
}
280+
281+
@Override
282+
public TypeInformation<Row> getProducedType() {
283+
return typeInfo;
284+
}
280285
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
281281
}
282282
}
283283

284-
284+
@Override
285+
public TypeInformation<Row> getProducedType() {
286+
return typeInfo;
287+
}
285288

286289
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@
5353
import java.util.Map;
5454
import java.util.Set;
5555

56-
import static com.dtstack.flink.sql.metric.MetricConstant.*;
56+
import static com.dtstack.flink.sql.metric.MetricConstant.DT_PARTITION_GROUP;
57+
import static com.dtstack.flink.sql.metric.MetricConstant.DT_TOPIC_GROUP;
58+
import static com.dtstack.flink.sql.metric.MetricConstant.DT_TOPIC_PARTITION_LAG_GAUGE;
5759

5860
/**
5961
* json string parsing custom
@@ -283,4 +285,10 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
283285
}
284286
}
285287
}
288+
289+
@Override
290+
public TypeInformation<Row> getProducedType() {
291+
return typeInfo;
292+
}
293+
286294
}

0 commit comments

Comments
 (0)