2424import org .apache .flink .api .common .serialization .AbstractDeserializationSchema ;
2525import org .apache .flink .api .common .typeinfo .TypeInformation ;
2626import org .apache .flink .api .common .typeinfo .Types ;
27+ import org .apache .flink .api .java .typeutils .ObjectArrayTypeInfo ;
2728import org .apache .flink .api .java .typeutils .RowTypeInfo ;
2829import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonProcessingException ;
2930import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
3435import org .apache .flink .types .Row ;
3536
3637import java .io .IOException ;
38+ import java .lang .reflect .Array ;
3739import java .sql .Date ;
3840import java .sql .Time ;
3941import java .sql .Timestamp ;
@@ -71,30 +73,7 @@ public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String,
7173 public Row deserialize (byte [] message ) throws IOException {
7274 JsonNode root = objectMapper .readTree (message );
7375 this .parseTree (root , null );
74- Row row = new Row (fieldNames .length );
75-
76- try {
77- for (int i = 0 ; i < fieldNames .length ; i ++) {
78- JsonNode node = getIgnoreCase (fieldNames [i ]);
79- AbstractTableInfo .FieldExtraInfo fieldExtraInfo = fieldExtraInfos .get (i );
80-
81- if (node == null ) {
82- if (fieldExtraInfo != null && fieldExtraInfo .getNotNull ()) {
83- throw new IllegalStateException ("Failed to find field with name '"
84- + fieldNames [i ] + "'." );
85- } else {
86- row .setField (i , null );
87- }
88- } else {
89- // Read the value as specified type
90- Object value = convert (node , fieldTypes [i ]);
91- row .setField (i , value );
92- }
93- }
94- return row ;
95- } finally {
96- nodeAndJsonNodeMapping .clear ();
97- }
76+ return convertTopRow ();
9877 }
9978
10079 private void parseTree (JsonNode jsonNode , String prefix ){
@@ -168,7 +147,11 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
168147 } else if (info .getTypeClass ().equals (Types .SQL_TIMESTAMP .getTypeClass ())) {
169148 // local zone
170149 return Timestamp .valueOf (node .asText ());
171- } else {
150+ } else if (info instanceof RowTypeInfo ) {
151+ return convertRow (node , (RowTypeInfo ) info );
152+ } else if (info instanceof ObjectArrayTypeInfo ) {
153+ return convertObjectArray (node , ((ObjectArrayTypeInfo ) info ).getComponentInfo ());
154+ } else {
172155 // for types that were specified without JSON schema
173156 // e.g. POJOs
174157 try {
@@ -179,5 +162,55 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
179162 }
180163 }
181164
165+ private Row convertTopRow () {
166+ Row row = new Row (fieldNames .length );
167+ try {
168+ for (int i = 0 ; i < fieldNames .length ; i ++) {
169+ JsonNode node = getIgnoreCase (fieldNames [i ]);
170+ AbstractTableInfo .FieldExtraInfo fieldExtraInfo = fieldExtraInfos .get (i );
182171
172+ if (node == null ) {
173+ if (fieldExtraInfo != null && fieldExtraInfo .getNotNull ()) {
174+ throw new IllegalStateException ("Failed to find field with name '"
175+ + fieldNames [i ] + "'." );
176+ } else {
177+ row .setField (i , null );
178+ }
179+ } else {
180+ // Read the value as specified type
181+ Object value = convert (node , fieldTypes [i ]);
182+ row .setField (i , value );
183+ }
184+ }
185+ return row ;
186+ } finally {
187+ nodeAndJsonNodeMapping .clear ();
188+ }
189+ }
190+
191+ private Row convertRow (JsonNode node , RowTypeInfo info ) {
192+ final String [] names = info .getFieldNames ();
193+ final TypeInformation <?>[] types = info .getFieldTypes ();
194+
195+ final Row row = new Row (names .length );
196+ for (int i = 0 ; i < names .length ; i ++) {
197+ final String name = names [i ];
198+ final JsonNode subNode = node .get (name );
199+ if (subNode == null ) {
200+ row .setField (i , null );
201+ } else {
202+ row .setField (i , convert (subNode , types [i ]));
203+ }
204+ }
205+
206+ return row ;
207+ }
208+
209+ private Object convertObjectArray (JsonNode node , TypeInformation <?> elementType ) {
210+ final Object [] array = (Object []) Array .newInstance (elementType .getTypeClass (), node .size ());
211+ for (int i = 0 ; i < node .size (); i ++) {
212+ array [i ] = convert (node .get (i ), elementType );
213+ }
214+ return array ;
215+ }
183216}
0 commit comments