Skip to content

Commit 577e4bc

Browse files
committed
[opt-1422][file] file source增加数据指标,增加fromLine参数,调整部分逻辑
1 parent cc32339 commit 577e4bc

8 files changed

Lines changed: 551 additions & 199 deletions

File tree

Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.file;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.api.common.serialization.DeserializationSchema;
23+
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
24+
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.common.typeinfo.Types;
27+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
28+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29+
import org.apache.flink.formats.csv.CsvRowSchemaConverter;
30+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
31+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
34+
import org.apache.flink.types.Row;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import java.io.IOException;
39+
import java.io.Serializable;
40+
import java.lang.reflect.Array;
41+
import java.math.BigDecimal;
42+
import java.math.BigInteger;
43+
import java.sql.Date;
44+
import java.sql.Time;
45+
import java.sql.Timestamp;
46+
47+
/**
48+
* @author tiezhu
49+
* @date 2021/4/14 星期三
50+
* Company dtstack
51+
*/
52+
@PublicEvolving
53+
public class DTCsvRowDeserializationSchema implements DeserializationSchema<Row> {
54+
private static final Logger LOG = LoggerFactory.getLogger(DTCsvRowDeserializationSchema.class);
55+
56+
/**
57+
* Type information describing the result type.
58+
*/
59+
private TypeInformation<Row> typeInfo;
60+
61+
/**
62+
* Runtime instance that performs the actual work.
63+
*/
64+
private RuntimeConverter runtimeConverter;
65+
66+
/**
67+
* Object reader used to read rows. It is configured by {@link CsvSchema}.
68+
*/
69+
private ObjectReader objectReader;
70+
71+
private int fromLine;
72+
73+
/**
74+
* 字段值的分割符,默认为','
75+
*/
76+
private Character fieldDelimiter;
77+
78+
/**
79+
* 针对null的替换值,默认为"null"字符
80+
*/
81+
private String nullLiteral;
82+
83+
/**
84+
* 默认为true
85+
*/
86+
private Boolean allowComments;
87+
88+
/**
89+
* 数组元素分割符,默认为','
90+
*/
91+
private String arrayElementDelimiter;
92+
93+
private Character quoteCharacter;
94+
95+
private Character escapeCharacter;
96+
97+
@Override
98+
public Row deserialize(byte[] message) throws IOException {
99+
try {
100+
final JsonNode root = objectReader.readValue(message);
101+
return (Row) runtimeConverter.convert(root);
102+
} catch (Exception e) {
103+
throw new IOException(e);
104+
}
105+
}
106+
107+
@Override
108+
public boolean isEndOfStream(Row nextElement) {
109+
return false;
110+
}
111+
112+
@Override
113+
public TypeInformation<Row> getProducedType() {
114+
return typeInfo;
115+
}
116+
117+
private void init() {
118+
runtimeConverter = createRowRuntimeConverter((RowTypeInfo) typeInfo, true);
119+
CsvSchema csvSchema = initCsvSchema(typeInfo);
120+
objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
121+
}
122+
123+
private CsvSchema initCsvSchema(TypeInformation<Row> typeInfo) {
124+
125+
CsvSchema.Builder rebuild = CsvRowSchemaConverter
126+
.convert((RowTypeInfo) typeInfo)
127+
.rebuild()
128+
.setAllowComments(allowComments);
129+
130+
if (arrayElementDelimiter != null) {
131+
rebuild.setArrayElementSeparator(arrayElementDelimiter);
132+
}
133+
134+
if (fieldDelimiter != null) {
135+
rebuild.setColumnSeparator(fieldDelimiter);
136+
}
137+
138+
if (quoteCharacter != null) {
139+
rebuild.setQuoteChar(quoteCharacter);
140+
}
141+
142+
if (nullLiteral != null) {
143+
rebuild.setNullValue(nullLiteral);
144+
}
145+
146+
if (escapeCharacter != null) {
147+
rebuild.setEscapeChar(escapeCharacter);
148+
}
149+
150+
return rebuild.build();
151+
}
152+
153+
// --------------------------------------------------------------------------------------------
154+
// Setter
155+
// --------------------------------------------------------------------------------------------
156+
157+
public void setFromLine(int fromLine) {
158+
this.fromLine = fromLine;
159+
}
160+
161+
public void setFieldDelimiter(Character fieldDelimiter) {
162+
this.fieldDelimiter = fieldDelimiter;
163+
}
164+
165+
public void setNullLiteral(String nullLiteral) {
166+
this.nullLiteral = nullLiteral;
167+
}
168+
169+
public void setAllowComments(Boolean allowComments) {
170+
this.allowComments = allowComments;
171+
}
172+
173+
public void setArrayElementDelimiter(String arrayElementDelimiter) {
174+
this.arrayElementDelimiter = arrayElementDelimiter;
175+
}
176+
177+
public void setQuoteCharacter(Character quoteCharacter) {
178+
this.quoteCharacter = quoteCharacter;
179+
}
180+
181+
public void setEscapeCharacter(Character escapeCharacter) {
182+
this.escapeCharacter = escapeCharacter;
183+
}
184+
185+
public void setTypeInfo(TypeInformation<Row> typeInfo) {
186+
this.typeInfo = typeInfo;
187+
}
188+
189+
public int getFromLine() {
190+
return fromLine;
191+
}
192+
193+
// --------------------------------------------------------------------------------------------
194+
// Builder
195+
// --------------------------------------------------------------------------------------------
196+
197+
public static class Builder {
198+
199+
private final DTCsvRowDeserializationSchema deserializationSchema;
200+
201+
public Builder() {
202+
deserializationSchema = new DTCsvRowDeserializationSchema();
203+
}
204+
205+
public Builder setFieldDelimiter(Character fieldDelimiter) {
206+
deserializationSchema.setFieldDelimiter(fieldDelimiter);
207+
return this;
208+
}
209+
210+
public Builder setNullLiteral(String nullLiteral) {
211+
deserializationSchema.setNullLiteral(nullLiteral);
212+
return this;
213+
}
214+
215+
public Builder setAllowComments(Boolean allowComments) {
216+
deserializationSchema.setAllowComments(allowComments);
217+
return this;
218+
}
219+
220+
public Builder setArrayElementDelimiter(String arrayElementDelimiter) {
221+
deserializationSchema.setArrayElementDelimiter(arrayElementDelimiter);
222+
return this;
223+
}
224+
225+
public Builder setQuoteCharacter(Character quoteCharacter) {
226+
deserializationSchema.setQuoteCharacter(quoteCharacter);
227+
return this;
228+
}
229+
230+
public Builder setEscapeCharacter(Character escapeCharacter) {
231+
deserializationSchema.setEscapeCharacter(escapeCharacter);
232+
return this;
233+
}
234+
235+
public Builder setFromLine(int fromLine) {
236+
deserializationSchema.setFromLine(fromLine);
237+
return this;
238+
}
239+
240+
public Builder setTypeInfo(TypeInformation<Row> typeInfo) {
241+
deserializationSchema.setTypeInfo(typeInfo);
242+
return this;
243+
}
244+
245+
public DTCsvRowDeserializationSchema build() {
246+
deserializationSchema.init();
247+
return deserializationSchema;
248+
}
249+
}
250+
251+
// --------------------------------------------------------------------------------------------
252+
// RuntimeConverter
253+
// --------------------------------------------------------------------------------------------
254+
255+
private interface RuntimeConverter extends Serializable {
256+
Object convert(JsonNode node) throws IOException;
257+
}
258+
259+
private static RuntimeConverter createRowRuntimeConverter(
260+
RowTypeInfo rowTypeInfo,
261+
boolean isTopLevel) {
262+
final TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
263+
final String[] fieldNames = rowTypeInfo.getFieldNames();
264+
265+
final RuntimeConverter[] fieldConverters =
266+
createFieldRuntimeConverters(fieldTypes);
267+
268+
return assembleRowRuntimeConverter(isTopLevel, fieldNames, fieldConverters);
269+
}
270+
271+
private static RuntimeConverter[] createFieldRuntimeConverters(TypeInformation<?>[] fieldTypes) {
272+
final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length];
273+
for (int i = 0; i < fieldTypes.length; i++) {
274+
fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i]);
275+
}
276+
return fieldConverters;
277+
}
278+
279+
private static RuntimeConverter assembleRowRuntimeConverter(
280+
boolean isTopLevel,
281+
String[] fieldNames,
282+
RuntimeConverter[] fieldConverters) {
283+
final int rowArity = fieldNames.length;
284+
285+
return (node) -> {
286+
final int nodeSize = node.size();
287+
288+
validateArity(rowArity, nodeSize);
289+
290+
final Row row = new Row(rowArity);
291+
for (int i = 0; i < Math.min(rowArity, nodeSize); i++) {
292+
// Jackson only supports mapping by name in the first level
293+
if (isTopLevel) {
294+
row.setField(i, fieldConverters[i].convert(node.get(fieldNames[i])));
295+
} else {
296+
row.setField(i, fieldConverters[i].convert(node.get(i)));
297+
}
298+
}
299+
return row;
300+
};
301+
}
302+
303+
private static RuntimeConverter createNullableRuntimeConverter(
304+
TypeInformation<?> info) {
305+
final RuntimeConverter valueConverter = createRuntimeConverter(info);
306+
return (node) -> {
307+
if (node.isNull()) {
308+
return null;
309+
}
310+
return valueConverter.convert(node);
311+
};
312+
}
313+
314+
private static RuntimeConverter createRuntimeConverter(TypeInformation<?> info) {
315+
if (info.equals(Types.VOID)) {
316+
return (node) -> null;
317+
} else if (info.equals(Types.STRING)) {
318+
return JsonNode::asText;
319+
} else if (info.equals(Types.BOOLEAN)) {
320+
return (node) -> Boolean.valueOf(node.asText().trim());
321+
} else if (info.equals(Types.BYTE)) {
322+
return (node) -> Byte.valueOf(node.asText().trim());
323+
} else if (info.equals(Types.SHORT)) {
324+
return (node) -> Short.valueOf(node.asText().trim());
325+
} else if (info.equals(Types.INT)) {
326+
return (node) -> Integer.valueOf(node.asText().trim());
327+
} else if (info.equals(Types.LONG)) {
328+
return (node) -> Long.valueOf(node.asText().trim());
329+
} else if (info.equals(Types.FLOAT)) {
330+
return (node) -> Float.valueOf(node.asText().trim());
331+
} else if (info.equals(Types.DOUBLE)) {
332+
return (node) -> Double.valueOf(node.asText().trim());
333+
} else if (info.equals(Types.BIG_DEC)) {
334+
return (node) -> new BigDecimal(node.asText().trim());
335+
} else if (info.equals(Types.BIG_INT)) {
336+
return (node) -> new BigInteger(node.asText().trim());
337+
} else if (info.equals(Types.SQL_DATE)) {
338+
return (node) -> Date.valueOf(node.asText());
339+
} else if (info.equals(Types.SQL_TIME)) {
340+
return (node) -> Time.valueOf(node.asText());
341+
} else if (info.equals(Types.SQL_TIMESTAMP)) {
342+
return (node) -> Timestamp.valueOf(node.asText());
343+
} else if (info.equals(Types.LOCAL_DATE)) {
344+
return (node) -> Date.valueOf(node.asText()).toLocalDate();
345+
} else if (info.equals(Types.LOCAL_TIME)) {
346+
return (node) -> Time.valueOf(node.asText()).toLocalTime();
347+
} else if (info.equals(Types.LOCAL_DATE_TIME)) {
348+
return (node) -> Timestamp.valueOf(node.asText()).toLocalDateTime();
349+
} else if (info instanceof RowTypeInfo) {
350+
final RowTypeInfo rowTypeInfo = (RowTypeInfo) info;
351+
return createRowRuntimeConverter(rowTypeInfo, false);
352+
} else if (info instanceof BasicArrayTypeInfo) {
353+
return createObjectArrayRuntimeConverter(
354+
((BasicArrayTypeInfo<?, ?>) info).getComponentInfo()
355+
);
356+
} else if (info instanceof ObjectArrayTypeInfo) {
357+
return createObjectArrayRuntimeConverter(
358+
((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo()
359+
);
360+
} else if (info instanceof PrimitiveArrayTypeInfo &&
361+
((PrimitiveArrayTypeInfo<?>) info).getComponentType() == Types.BYTE) {
362+
return createByteArrayRuntimeConverter();
363+
} else {
364+
throw new RuntimeException("Unsupported type information '" + info + "'.");
365+
}
366+
}
367+
368+
private static RuntimeConverter createObjectArrayRuntimeConverter(
369+
TypeInformation<?> elementType) {
370+
final Class<?> elementClass = elementType.getTypeClass();
371+
final RuntimeConverter elementConverter = createNullableRuntimeConverter(elementType);
372+
373+
return (node) -> {
374+
final int nodeSize = node.size();
375+
final Object[] array = (Object[]) Array.newInstance(elementClass, nodeSize);
376+
for (int i = 0; i < nodeSize; i++) {
377+
array[i] = elementConverter.convert(node.get(i));
378+
}
379+
return array;
380+
};
381+
}
382+
383+
private static RuntimeConverter createByteArrayRuntimeConverter() {
384+
return JsonNode::binaryValue;
385+
}
386+
387+
private static void validateArity(int expected, int actual) {
388+
if (expected != actual) {
389+
LOG.warn("Row length mismatch. " + expected +
390+
" fields expected but was " + actual + ".");
391+
}
392+
}
393+
}

0 commit comments

Comments
 (0)