Skip to content

Commit f66e56e

Browse files
author
yanxi
committed
Merge branch 'v1.5.0_dev_merge' into 'v1.5.0_dev'
V1.5.0 dev merge See merge request !11
2 parents 873884e + 4086e41 commit f66e56e

14 files changed

Lines changed: 313 additions & 46 deletions

File tree

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414
* 结果表:mysql,SQlServer,oracle,hbase,elasticsearch5.x,mongo,redis,cassandra
1515

1616
# 后续开发计划
17-
* 增加kafka结果表功能
1817
* 增加SQL支持CEP
1918
* 维表快照
2019
* sql优化(谓词下移等)
20+
* serverSocket 源表
21+
* console 结果表
22+
* kafka avro格式
23+
* topN
2124

2225
## 1 快速起步
2326
### 1.1 运行模式

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ protected boolean fieldNameNeedsUpperCase() {
5656
return true;
5757
}
5858

59-
public abstract TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props);
59+
public abstract TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception;
6060

6161
public boolean dealKeyPattern(String fieldRow, TableInfo tableInfo){
6262
for(Map.Entry<String, Pattern> keyPattern : keyPatternMap.entrySet()){

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
24+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2425

2526
import java.io.Serializable;
2627
import java.util.List;
28+
import java.util.Map;
2729

2830
/**
2931
* Reason:
@@ -48,6 +50,9 @@ public abstract class TableInfo implements Serializable {
4850

4951
private final List<String> fieldList = Lists.newArrayList();
5052

53+
/**key:别名, value: realField */
54+
private Map<String, String> physicalFields = Maps.newHashMap();
55+
5156
private final List<String> fieldTypeList = Lists.newArrayList();
5257

5358
private final List<Class> fieldClassList = Lists.newArrayList();
@@ -114,6 +119,10 @@ public void addField(String fieldName){
114119
fieldList.add(fieldName);
115120
}
116121

122+
public void addPhysicalMappings(String aliasName, String physicalFieldName){
123+
physicalFields.put(aliasName, physicalFieldName);
124+
}
125+
117126
public void addFieldClass(Class fieldClass){
118127
fieldClassList.add(fieldClass);
119128
}
@@ -146,6 +155,14 @@ public List<Class> getFieldClassList() {
146155
return fieldClassList;
147156
}
148157

158+
public Map<String, String> getPhysicalFields() {
159+
return physicalFields;
160+
}
161+
162+
public void setPhysicalFields(Map<String, String> physicalFields) {
163+
this.physicalFields = physicalFields;
164+
}
165+
149166
public void finish(){
150167
this.fields = fieldList.toArray(new String[fieldList.size()]);
151168
this.fieldClasses = fieldClassList.toArray(new Class[fieldClassList.size()]);

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2829
import org.apache.flink.metrics.MetricGroup;
30+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2931
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3032
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3134
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3235
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3336
import org.apache.flink.types.Row;
@@ -40,6 +43,7 @@
4043
import java.io.IOException;
4144
import java.lang.reflect.Field;
4245
import java.util.Iterator;
46+
import java.util.Map;
4347
import java.util.Set;
4448

4549
import static com.dtstack.flink.sql.metric.MetricConstant.*;
@@ -75,12 +79,19 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7579

7680
private boolean firstMsg = true;
7781

78-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
82+
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
83+
84+
private Map<String, String> rowAndFieldMapping;
85+
86+
87+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
7988
this.typeInfo = typeInfo;
8089

8190
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
8291

8392
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
93+
94+
this.rowAndFieldMapping= rowAndFieldMapping;
8495
}
8596

8697
@Override
@@ -101,9 +112,11 @@ public Row deserialize(byte[] message) throws IOException {
101112
numInBytes.inc(message.length);
102113

103114
JsonNode root = objectMapper.readTree(message);
115+
parseTree(root, null);
104116
Row row = new Row(fieldNames.length);
117+
105118
for (int i = 0; i < fieldNames.length; i++) {
106-
JsonNode node = getIgnoreCase(root, fieldNames[i]);
119+
JsonNode node = getIgnoreCase(fieldNames[i]);
107120

108121
if (node == null) {
109122
if (failOnMissingField) {
@@ -125,25 +138,54 @@ public Row deserialize(byte[] message) throws IOException {
125138
//add metric of dirty data
126139
dirtyDataCounter.inc();
127140
return null;
141+
}finally {
142+
nodeAndJsonNodeMapping.clear();
128143
}
129144
}
130145

131146
public void setFailOnMissingField(boolean failOnMissingField) {
132147
this.failOnMissingField = failOnMissingField;
133148
}
134149

135-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
150+
private JsonNode getIgnoreCase(String key) {
151+
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
152+
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
153+
if(node == null){
154+
return null;
155+
}
136156

137-
Iterator<String> iter = jsonNode.fieldNames();
138-
while (iter.hasNext()) {
139-
String key1 = iter.next();
140-
if (key1.equalsIgnoreCase(key)) {
141-
return jsonNode.get(key1);
157+
JsonNodeType nodeType = node.getNodeType();
158+
159+
if (nodeType == JsonNodeType.ARRAY){
160+
throw new IllegalStateException("Unsupported type information array .") ;
161+
}
162+
163+
return node;
164+
}
165+
166+
167+
private void parseTree(JsonNode jsonNode, String prefix){
168+
169+
Iterator<String> iterator = jsonNode.fieldNames();
170+
while (iterator.hasNext()){
171+
String next = iterator.next();
172+
JsonNode child = jsonNode.get(next);
173+
String nodeKey = getNodeKey(prefix, next);
174+
175+
if (child.isValueNode()){
176+
nodeAndJsonNodeMapping.put(nodeKey, child);
177+
}else {
178+
parseTree(child, nodeKey);
142179
}
143180
}
181+
}
144182

145-
return null;
183+
private String getNodeKey(String prefix, String nodeName){
184+
if(Strings.isNullOrEmpty(prefix)){
185+
return nodeName;
186+
}
146187

188+
return prefix + "." + nodeName;
147189
}
148190

149191
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6868

6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka09SourceTableInfo.getBootstrapServers());
71-
props.setProperty("auto.offset.reset", kafka09SourceTableInfo.getOffsetReset());
71+
if (DtStringUtil.isJosn(kafka09SourceTableInfo.getOffsetReset())){
72+
props.setProperty("auto.offset.reset", "none");
73+
} else {
74+
props.setProperty("auto.offset.reset", kafka09SourceTableInfo.getOffsetReset());
75+
}
7276
if (StringUtils.isNotBlank(kafka09SourceTableInfo.getGroupId())){
7377
props.setProperty("group.id", kafka09SourceTableInfo.getGroupId());
7478
}
@@ -84,10 +88,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8488
FlinkKafkaConsumer09<Row> kafkaSrc;
8589
if (BooleanUtils.isTrue(kafka09SourceTableInfo.getTopicIsPattern())) {
8690
kafkaSrc = new CustomerKafka09Consumer(Pattern.compile(topicName),
87-
new CustomerJsonDeserialization(typeInformation), props);
91+
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields()), props);
8892
} else {
8993
kafkaSrc = new CustomerKafka09Consumer(topicName,
90-
new CustomerJsonDeserialization(typeInformation), props);
94+
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields()), props);
9195
}
9296

9397
//earliest,latest

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
package com.dtstack.flink.sql.source.kafka.table;
2222

2323
import com.dtstack.flink.sql.table.AbsSourceParser;
24+
import com.dtstack.flink.sql.table.SourceTableInfo;
2425
import com.dtstack.flink.sql.table.TableInfo;
26+
import com.dtstack.flink.sql.util.ClassUtil;
2527
import com.dtstack.flink.sql.util.MathUtil;
2628

2729
import java.util.Map;
30+
import java.util.regex.Matcher;
31+
import java.util.regex.Pattern;
2832

2933
/**
3034
* Reason:
@@ -35,15 +39,42 @@
3539

3640
public class KafkaSourceParser extends AbsSourceParser {
3741

42+
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
43+
44+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
45+
46+
static {
47+
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
48+
49+
keyHandlerMap.put(KAFKA_NEST_FIELD_KEY, KafkaSourceParser::dealNestField);
50+
}
51+
52+
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
53+
String physicalField = matcher.group(1);
54+
String fieldType = matcher.group(3);
55+
String mappingField = matcher.group(4);
56+
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
57+
58+
tableInfo.addPhysicalMappings(mappingField, physicalField);
59+
tableInfo.addField(mappingField);
60+
tableInfo.addFieldClass(fieldClass);
61+
tableInfo.addFieldType(fieldType);
62+
}
63+
3864
@Override
39-
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
65+
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception {
4066

4167
KafkaSourceTableInfo kafka09SourceTableInfo = new KafkaSourceTableInfo();
4268
kafka09SourceTableInfo.setName(tableName);
4369
parseFieldsInfo(fieldsInfo, kafka09SourceTableInfo);
4470

4571
kafka09SourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
46-
kafka09SourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
72+
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase()));
73+
if (bootstrapServer == null || bootstrapServer.trim().equals("")){
74+
throw new Exception("BootstrapServers can not be empty!");
75+
} else {
76+
kafka09SourceTableInfo.setBootstrapServers(bootstrapServer);
77+
}
4778
kafka09SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
4879
kafka09SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
4980
kafka09SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2829
import org.apache.flink.metrics.MetricGroup;
30+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2931
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3032
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3134
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3235
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3336
import org.apache.flink.types.Row;
@@ -40,6 +43,7 @@
4043
import java.io.IOException;
4144
import java.lang.reflect.Field;
4245
import java.util.Iterator;
46+
import java.util.Map;
4347
import java.util.Set;
4448

4549
import static com.dtstack.flink.sql.metric.MetricConstant.*;
@@ -75,12 +79,18 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7579

7680
private boolean firstMsg = true;
7781

78-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
82+
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
83+
84+
private Map<String, String> rowAndFieldMapping;
85+
86+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
7987
this.typeInfo = typeInfo;
8088

8189
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
8290

8391
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
92+
93+
this.rowAndFieldMapping= rowAndFieldMapping;
8494
}
8595

8696
@Override
@@ -101,9 +111,11 @@ public Row deserialize(byte[] message) throws IOException {
101111
numInBytes.inc(message.length);
102112

103113
JsonNode root = objectMapper.readTree(message);
114+
parseTree(root, null);
104115
Row row = new Row(fieldNames.length);
116+
105117
for (int i = 0; i < fieldNames.length; i++) {
106-
JsonNode node = getIgnoreCase(root, fieldNames[i]);
118+
JsonNode node = getIgnoreCase(fieldNames[i]);
107119

108120
if (node == null) {
109121
if (failOnMissingField) {
@@ -125,25 +137,55 @@ public Row deserialize(byte[] message) throws IOException {
125137
//add metric of dirty data
126138
dirtyDataCounter.inc();
127139
return null;
140+
}finally {
141+
nodeAndJsonNodeMapping.clear();
128142
}
129143
}
130144

145+
public JsonNode getIgnoreCase(String key) {
146+
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
147+
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
148+
149+
if(node == null){
150+
return null;
151+
}
152+
153+
JsonNodeType nodeType = node.getNodeType();
154+
155+
if (nodeType==JsonNodeType.ARRAY){
156+
throw new IllegalStateException("Unsupported type information array .") ;
157+
}
158+
159+
return node;
160+
}
161+
162+
131163
public void setFailOnMissingField(boolean failOnMissingField) {
132164
this.failOnMissingField = failOnMissingField;
133165
}
134166

135-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
167+
private void parseTree(JsonNode jsonNode, String prefix){
168+
169+
Iterator<String> iterator = jsonNode.fieldNames();
170+
while (iterator.hasNext()){
171+
String next = iterator.next();
172+
JsonNode child = jsonNode.get(next);
173+
String nodeKey = getNodeKey(prefix, next);
136174

137-
Iterator<String> iter = jsonNode.fieldNames();
138-
while (iter.hasNext()) {
139-
String key1 = iter.next();
140-
if (key1.equalsIgnoreCase(key)) {
141-
return jsonNode.get(key1);
175+
if (child.isValueNode()){
176+
nodeAndJsonNodeMapping.put(nodeKey, child);
177+
}else {
178+
parseTree(child, nodeKey);
142179
}
143180
}
181+
}
144182

145-
return null;
183+
private String getNodeKey(String prefix, String nodeName){
184+
if(Strings.isNullOrEmpty(prefix)){
185+
return nodeName;
186+
}
146187

188+
return prefix + "." + nodeName;
147189
}
148190

149191
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {

0 commit comments

Comments
 (0)