Skip to content

Commit 603d5bc

Browse files
committed
【feat】tableSource 添加对 charset 的支持
1 parent 21b0e2a commit 603d5bc

3 files changed

Lines changed: 31 additions & 19 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.flink.types.Row;
3939

4040
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
41-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
4241
import com.dtstack.flink.sql.enums.ClusterMode;
4342
import com.dtstack.flink.sql.enums.ECacheType;
4443
import com.dtstack.flink.sql.enums.EPluginLoadMode;

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
/**
4545
* source data parse to json format
46-
*
46+
* <p>
4747
* Date: 2019/12/12
4848
* Company: www.dtstack.com
4949
*
@@ -58,9 +58,12 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5858

5959
private final String[] fieldNames;
6060
private final TypeInformation<?>[] fieldTypes;
61-
private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
61+
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
62+
private String charsetName;
6263

63-
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos) {
64+
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
65+
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
66+
String charsetName) {
6467
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6568
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6669
this.rowAndFieldMapping = rowAndFieldMapping;
@@ -99,7 +102,7 @@ public Row deserialize(byte[] message) throws IOException {
99102
}
100103
}
101104

102-
private void parseTree(JsonNode jsonNode, String prefix){
105+
private void parseTree(JsonNode jsonNode, String prefix) {
103106
if (jsonNode.isArray()) {
104107
ArrayNode array = (ArrayNode) jsonNode;
105108
for (int i = 0; i < array.size(); i++) {
@@ -118,15 +121,15 @@ private void parseTree(JsonNode jsonNode, String prefix){
118121
return;
119122
}
120123
Iterator<String> iterator = jsonNode.fieldNames();
121-
while (iterator.hasNext()){
124+
while (iterator.hasNext()) {
122125
String next = iterator.next();
123126
JsonNode child = jsonNode.get(next);
124127
String nodeKey = getNodeKey(prefix, next);
125128

126129
nodeAndJsonNodeMapping.put(nodeKey, child);
127-
if(child.isArray()){
130+
if (child.isArray()) {
128131
parseTree(child, nodeKey);
129-
}else {
132+
} else {
130133
parseTree(child, nodeKey);
131134
}
132135
}
@@ -137,8 +140,8 @@ private JsonNode getIgnoreCase(String key) {
137140
return nodeAndJsonNodeMapping.get(nodeMappingKey);
138141
}
139142

140-
private String getNodeKey(String prefix, String nodeName){
141-
if(Strings.isNullOrEmpty(prefix)){
143+
private String getNodeKey(String prefix, String nodeName) {
144+
if (Strings.isNullOrEmpty(prefix)) {
142145
return nodeName;
143146
}
144147
return prefix + "." + nodeName;
@@ -162,15 +165,15 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
162165
} else {
163166
return node.asText();
164167
}
165-
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
168+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
166169
return Date.valueOf(node.asText());
167170
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
168171
// local zone
169172
return Time.valueOf(node.asText());
170173
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
171174
// local zone
172175
return Timestamp.valueOf(node.asText());
173-
} else {
176+
} else {
174177
// for types that were specified without JSON schema
175178
// e.g. POJOs
176179
try {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flink.sql.source.kafka.table;
2121

22+
import com.dtstack.flink.sql.format.FormatType;
2223
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
2324
import com.google.common.base.Preconditions;
2425

@@ -71,6 +72,7 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
7172

7273
public Map<String, String> kafkaParams = new HashMap<>();
7374

75+
public String charsetName;
7476

7577
public String getBootstrapServers() {
7678
return bootstrapServers;
@@ -148,11 +150,19 @@ public void setFieldDelimiter(String fieldDelimiter) {
148150
this.fieldDelimiter = fieldDelimiter;
149151
}
150152

151-
@Override
152-
public boolean check() {
153-
Preconditions.checkNotNull(getType(), "kafka of type is required");
154-
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
155-
Preconditions.checkNotNull(topic, "kafka of topic is required");
156-
return false;
157-
}
153+
public String getCharsetName() {
154+
return charsetName;
155+
}
156+
157+
public void setCharsetName(String charsetName) {
158+
this.charsetName = charsetName;
159+
}
160+
161+
@Override
162+
public boolean check() {
163+
Preconditions.checkNotNull(getType(), "kafka of type is required");
164+
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
165+
Preconditions.checkNotNull(topic, "kafka of topic is required");
166+
return false;
167+
}
158168
}

0 commit comments

Comments
 (0)