Skip to content

Commit 3e1f7a1

Browse files
committed
【fix】add charsetName
1 parent 603d5bc commit 3e1f7a1

3 files changed

Lines changed: 7 additions & 4 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5353

5454
private final ObjectMapper objectMapper = new ObjectMapper();
5555

56-
private Map<String, String> rowAndFieldMapping;
57-
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
56+
private final Map<String, String> rowAndFieldMapping;
57+
private final Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
5858

5959
private final String[] fieldNames;
6060
private final TypeInformation<?>[] fieldTypes;
61-
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
62-
private String charsetName;
61+
private final List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
62+
private final String charsetName;
6363

6464
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
6565
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5353
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));
5454
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
5555
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
56+
kafkaSourceTableInfo.setCharsetName(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CHARSET_NAME_KEY.toLowerCase(),"UTF-8")));
5657

5758
kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
5859
kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
5454

5555
public static final String SOURCE_DATA_TYPE_KEY = "sourceDataType";
5656

57+
public static final String CHARSET_NAME_KEY = "charsetName";
58+
5759
private String bootstrapServers;
5860

5961
private String topic;

0 commit comments

Comments
 (0)