Skip to content

Commit 25316e3

Browse files
committed
add kafka charsetname
1 parent 210b9c3 commit 25316e3

4 files changed

Lines changed: 22 additions & 4 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,22 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5858
private final String[] fieldNames;
5959
private final TypeInformation<?>[] fieldTypes;
6060
private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
61+
private String charsetName;
6162

62-
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos) {
63+
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
64+
List<TableInfo.FieldExtraInfo> fieldExtraInfos,
65+
String charsetName) {
6366
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6467
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6568
this.rowAndFieldMapping = rowAndFieldMapping;
6669
this.fieldExtraInfos = fieldExtraInfos;
70+
this.charsetName = charsetName;
6771
}
6872

6973
@Override
7074
public Row deserialize(byte[] message) throws IOException {
71-
JsonNode root = objectMapper.readTree(message);
75+
String decoderStr = new String(message, charsetName);
76+
JsonNode root = objectMapper.readTree(decoderStr);
7277
this.parseTree(root, null);
7378
Row row = new Row(fieldNames.length);
7479

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaConsumerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ protected DeserializationMetricWrapper createDeserializationMetricWrapper(KafkaS
5656
private DeserializationSchema<Row> createDeserializationSchema(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
5757
DeserializationSchema<Row> deserializationSchema = null;
5858
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
59-
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(), kafkaSourceTableInfo.getFieldExtraInfoList());
60-
59+
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(),
60+
kafkaSourceTableInfo.getFieldExtraInfoList(),kafkaSourceTableInfo.getCharsetName());
6161
} else if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
6262

6363
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
5252
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5353
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
5454
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
55+
kafkaSourceTableInfo.setCharsetName(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CHARSET_NAME_KEY.toLowerCase(),"UTF-8")));
5556
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5657
for (String key : props.keySet()) {
5758
if (!key.isEmpty() && key.startsWith("kafka.")) {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class KafkaSourceTableInfo extends SourceTableInfo {
4949

5050
public static final String TOPICISPATTERN_KEY = "topicIsPattern";
5151

52+
public static final String CHARSET_NAME_KEY = "charsetName";
53+
5254
private String bootstrapServers;
5355

5456
private String topic;
@@ -68,6 +70,8 @@ public class KafkaSourceTableInfo extends SourceTableInfo {
6870

6971
private String fieldDelimiter;
7072

73+
private String charsetName;
74+
7175
public String getBootstrapServers() {
7276
return bootstrapServers;
7377
}
@@ -160,6 +164,14 @@ public void setFieldDelimiter(String fieldDelimiter) {
160164
this.fieldDelimiter = fieldDelimiter;
161165
}
162166

167+
public String getCharsetName() {
168+
return charsetName;
169+
}
170+
171+
public void setCharsetName(String charsetName) {
172+
this.charsetName = charsetName;
173+
}
174+
163175
@Override
164176
public boolean check() {
165177
Preconditions.checkNotNull(getType(), "kafka of type is required");

0 commit comments

Comments
 (0)