2525import com .dtstack .flink .sql .source .kafka .metric .KafkaTopicPartitionLagMetric ;
2626import org .apache .flink .api .common .typeinfo .TypeInformation ;
2727import org .apache .flink .api .java .typeutils .RowTypeInfo ;
28+ import org .apache .flink .calcite .shaded .com .google .common .base .Strings ;
2829import org .apache .flink .metrics .MetricGroup ;
30+ import org .apache .flink .shaded .guava18 .com .google .common .collect .Maps ;
2931import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
3032import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
33+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .JsonNodeType ;
3134import org .apache .flink .streaming .connectors .kafka .internal .KafkaConsumerThread ;
3235import org .apache .flink .streaming .connectors .kafka .internals .AbstractFetcher ;
3336import org .apache .flink .types .Row ;
4043import java .io .IOException ;
4144import java .lang .reflect .Field ;
4245import java .util .Iterator ;
46+ import java .util .Map ;
4347import java .util .Set ;
4448
4549import static com .dtstack .flink .sql .metric .MetricConstant .*;
@@ -75,12 +79,19 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7579
7680 private boolean firstMsg = true ;
7781
78- public CustomerJsonDeserialization (TypeInformation <Row > typeInfo ){
82+ private Map <String , JsonNode > nodeAndJsonNodeMapping = Maps .newHashMap ();
83+
84+ private Map <String , String > rowAndFieldMapping ;
85+
86+
87+ public CustomerJsonDeserialization (TypeInformation <Row > typeInfo , Map <String , String > rowAndFieldMapping ){
7988 this .typeInfo = typeInfo ;
8089
8190 this .fieldNames = ((RowTypeInfo ) typeInfo ).getFieldNames ();
8291
8392 this .fieldTypes = ((RowTypeInfo ) typeInfo ).getFieldTypes ();
93+
94+ this .rowAndFieldMapping = rowAndFieldMapping ;
8495 }
8596
8697 @ Override
@@ -101,9 +112,11 @@ public Row deserialize(byte[] message) throws IOException {
101112 numInBytes .inc (message .length );
102113
103114 JsonNode root = objectMapper .readTree (message );
115+ parseTree (root , null );
104116 Row row = new Row (fieldNames .length );
117+
105118 for (int i = 0 ; i < fieldNames .length ; i ++) {
106- JsonNode node = getIgnoreCase (root , fieldNames [i ]);
119+ JsonNode node = getIgnoreCase (fieldNames [i ]);
107120
108121 if (node == null ) {
109122 if (failOnMissingField ) {
@@ -125,25 +138,54 @@ public Row deserialize(byte[] message) throws IOException {
125138 //add metric of dirty data
126139 dirtyDataCounter .inc ();
127140 return null ;
141+ }finally {
142+ nodeAndJsonNodeMapping .clear ();
128143 }
129144 }
130145
131146 public void setFailOnMissingField (boolean failOnMissingField ) {
132147 this .failOnMissingField = failOnMissingField ;
133148 }
134149
135- public JsonNode getIgnoreCase (JsonNode jsonNode , String key ) {
150+ private JsonNode getIgnoreCase (String key ) {
151+ String nodeMappingKey = rowAndFieldMapping .getOrDefault (key , key );
152+ JsonNode node = nodeAndJsonNodeMapping .get (nodeMappingKey );
153+ if (node == null ){
154+ return null ;
155+ }
136156
137- Iterator <String > iter = jsonNode .fieldNames ();
138- while (iter .hasNext ()) {
139- String key1 = iter .next ();
140- if (key1 .equalsIgnoreCase (key )) {
141- return jsonNode .get (key1 );
157+ JsonNodeType nodeType = node .getNodeType ();
158+
159+ if (nodeType == JsonNodeType .ARRAY ){
160+ throw new IllegalStateException ("Unsupported type information array ." ) ;
161+ }
162+
163+ return node ;
164+ }
165+
166+
167+ private void parseTree (JsonNode jsonNode , String prefix ){
168+
169+ Iterator <String > iterator = jsonNode .fieldNames ();
170+ while (iterator .hasNext ()){
171+ String next = iterator .next ();
172+ JsonNode child = jsonNode .get (next );
173+ String nodeKey = getNodeKey (prefix , next );
174+
175+ if (child .isValueNode ()){
176+ nodeAndJsonNodeMapping .put (nodeKey , child );
177+ }else {
178+ parseTree (child , nodeKey );
142179 }
143180 }
181+ }
144182
145- return null ;
183+ private String getNodeKey (String prefix , String nodeName ){
184+ if (Strings .isNullOrEmpty (prefix )){
185+ return nodeName ;
186+ }
146187
188+ return prefix + "." + nodeName ;
147189 }
148190
149191 public void setFetcher (AbstractFetcher <Row , ?> fetcher ) {
0 commit comments