3232import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonProcessingException ;
3333import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
3434import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
35- import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .JsonNodeType ;
3635import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .TextNode ;
3736import org .apache .flink .streaming .connectors .kafka .internal .KafkaConsumerThread ;
3837import org .apache .flink .streaming .connectors .kafka .internals .AbstractFetcher ;
4241import org .apache .kafka .common .TopicPartition ;
4342import org .slf4j .Logger ;
4443import org .slf4j .LoggerFactory ;
45-
44+ import org . apache . flink . shaded . jackson2 . com . fasterxml . jackson . databind . node . ArrayNode ;
4645import java .io .IOException ;
4746import java .lang .reflect .Field ;
4847import java .sql .Date ;
@@ -159,29 +158,37 @@ public Row deserialize(byte[] message) throws IOException {
159158 }
160159 }
161160
162- public void setFailOnMissingField (boolean failOnMissingField ) {
163- this .failOnMissingField = failOnMissingField ;
164- }
165-
166- private JsonNode getIgnoreCase (String key ) {
161+ public JsonNode getIgnoreCase (String key ) {
167162 String nodeMappingKey = rowAndFieldMapping .getOrDefault (key , key );
168- JsonNode node = nodeAndJsonNodeMapping .get (nodeMappingKey );
169- if (node == null ){
170- return null ;
171- }
172-
173- JsonNodeType nodeType = node .getNodeType ();
174163
175- if (nodeType == JsonNodeType .ARRAY ){
176- throw new IllegalStateException ("Unsupported type information array ." ) ;
177- }
164+ return nodeAndJsonNodeMapping .get (nodeMappingKey );
165+ }
178166
179- return node ;
167+ public void setFailOnMissingField (boolean failOnMissingField ) {
168+ this .failOnMissingField = failOnMissingField ;
180169 }
181170
182171
183172 private void parseTree (JsonNode jsonNode , String prefix ){
184173
174+ if (jsonNode .isArray ()) {
175+ ArrayNode array = (ArrayNode ) jsonNode ;
176+ for (int i = 0 ; i < array .size (); i ++) {
177+ JsonNode child = array .get (i );
178+ String nodeKey = getNodeKey (prefix , i );
179+
180+ if (child .isValueNode ()) {
181+ nodeAndJsonNodeMapping .put (nodeKey , child );
182+ } else {
183+ if (rowAndFieldMapping .containsValue (nodeKey )) {
184+ nodeAndJsonNodeMapping .put (nodeKey , child );
185+ }
186+ parseTree (child , nodeKey );
187+ }
188+ }
189+ return ;
190+ }
191+
185192 Iterator <String > iterator = jsonNode .fieldNames ();
186193 while (iterator .hasNext ()){
187194 String next = iterator .next ();
@@ -191,7 +198,7 @@ private void parseTree(JsonNode jsonNode, String prefix){
191198 if (child .isValueNode ()){
192199 nodeAndJsonNodeMapping .put (nodeKey , child );
193200 } else if (child .isArray ()){
194- nodeAndJsonNodeMapping . put ( nodeKey , new TextNode ( child . toString ()) );
201+ parseTree ( child , nodeKey );
195202 }else {
196203 parseTree (child , nodeKey );
197204 }
@@ -206,6 +213,14 @@ private String getNodeKey(String prefix, String nodeName){
206213 return prefix + "." + nodeName ;
207214 }
208215
216+ private String getNodeKey (String prefix , int i ) {
217+ if (Strings .isNullOrEmpty (prefix )) {
218+ return "[" + i + "]" ;
219+ }
220+
221+ return prefix + "[" + i + "]" ;
222+ }
223+
209224 public void setFetcher (AbstractFetcher <Row , ?> fetcher ) {
210225 this .fetcher = fetcher ;
211226 }
0 commit comments