Skip to content

Commit 74eaf32

Browse files
committed
Merge branch 'feat_1.10_4.2.x_fileOpt' into '1.10_test_4.2.x_0426'
Feat 1.10 4.2.x file opt See merge request dt-insight-engine/flinkStreamSQL!273
2 parents 92e5732 + c98e625 commit 74eaf32

6 files changed

Lines changed: 65 additions & 46 deletions

File tree

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/DTCsvRowDeserializationSchema.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.file;
2020

21+
import com.dtstack.flink.sql.source.file.throwable.LengthMismatchException;
2122
import org.apache.flink.annotation.PublicEvolving;
2223
import org.apache.flink.api.common.serialization.DeserializationSchema;
2324
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -68,8 +69,6 @@ public class DTCsvRowDeserializationSchema implements DeserializationSchema<Row>
6869
*/
6970
private ObjectReader objectReader;
7071

71-
private int fromLine;
72-
7372
/**
7473
* 字段值的分割符,默认为','
7574
*/
@@ -154,10 +153,6 @@ private CsvSchema initCsvSchema(TypeInformation<Row> typeInfo) {
154153
// Setter
155154
// --------------------------------------------------------------------------------------------
156155

157-
public void setFromLine(int fromLine) {
158-
this.fromLine = fromLine;
159-
}
160-
161156
public void setFieldDelimiter(Character fieldDelimiter) {
162157
this.fieldDelimiter = fieldDelimiter;
163158
}
@@ -186,10 +181,6 @@ public void setTypeInfo(TypeInformation<Row> typeInfo) {
186181
this.typeInfo = typeInfo;
187182
}
188183

189-
public int getFromLine() {
190-
return fromLine;
191-
}
192-
193184
// --------------------------------------------------------------------------------------------
194185
// Builder
195186
// --------------------------------------------------------------------------------------------
@@ -232,11 +223,6 @@ public Builder setEscapeCharacter(Character escapeCharacter) {
232223
return this;
233224
}
234225

235-
public Builder setFromLine(int fromLine) {
236-
deserializationSchema.setFromLine(fromLine);
237-
return this;
238-
}
239-
240226
public Builder setTypeInfo(TypeInformation<Row> typeInfo) {
241227
deserializationSchema.setTypeInfo(typeInfo);
242228
return this;
@@ -386,8 +372,9 @@ private static RuntimeConverter createByteArrayRuntimeConverter() {
386372

387373
private static void validateArity(int expected, int actual) {
388374
if (expected != actual) {
389-
LOG.warn("Row length mismatch. " + expected +
390-
" fields expected but was " + actual + ".");
375+
throw new LengthMismatchException(
376+
"Row length mismatch. " + expected +
377+
" fields expected but was " + actual + ".");
391378
}
392379
}
393380
}

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/FileSource.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.file;
2020

21+
import com.dtstack.flink.sql.exception.ExceptionTrace;
2122
import com.dtstack.flink.sql.metric.MetricConstant;
2223
import com.dtstack.flink.sql.source.IStreamSourceGener;
2324
import com.dtstack.flink.sql.source.file.table.FileSourceTableInfo;
@@ -51,6 +52,7 @@
5152
import java.net.URI;
5253
import java.util.Locale;
5354
import java.util.concurrent.atomic.AtomicBoolean;
55+
import java.util.concurrent.atomic.AtomicInteger;
5456

5557
/**
5658
* @author tiezhu
@@ -79,6 +81,8 @@ public class FileSource extends AbstractRichFunction implements IStreamSourceGen
7981

8082
private String charset;
8183

84+
private int fromLine;
85+
8286
protected transient Counter errorCounter;
8387

8488
/**
@@ -98,9 +102,12 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo,
98102
FileSource fileSource = new FileSource();
99103
FileSourceTableInfo tableInfo = (FileSourceTableInfo) sourceTableInfo;
100104

101-
DataStreamSource<?> source = fileSource.initDataStream(tableInfo, env);
105+
fileSource.initSource(tableInfo);
106+
102107
String fields = StringUtils.join(tableInfo.getFields(), ",");
103108

109+
DataStreamSource<Row> source = env.addSource(fileSource, tableInfo.getOperatorName(), tableInfo.getTypeInformation());
110+
104111
return tableEnv.fromDataStream(source, fields);
105112
}
106113

@@ -115,15 +122,15 @@ public void initMetric() {
115122
numInResolveRecord = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_RESOVED_IN_COUNTER);
116123
}
117124

118-
public DataStreamSource<?> initDataStream(FileSourceTableInfo tableInfo,
119-
StreamExecutionEnvironment env) {
125+
public void initSource(FileSourceTableInfo tableInfo) {
120126
deserializationSchema = tableInfo.getDeserializationSchema();
121127
fileUri = URI.create(tableInfo.getFilePath() + SP + tableInfo.getFileName());
128+
122129
charset = tableInfo.getCharsetName();
123-
return env.addSource(
124-
this,
125-
tableInfo.getOperatorName(),
126-
tableInfo.buildRowTypeInfo());
130+
LOG.info("File charset: " + charset);
131+
132+
fromLine = tableInfo.getFromLine();
133+
LOG.info("Read from line: " + fromLine);
127134
}
128135

129136
/**
@@ -189,17 +196,12 @@ private InputStream fromLocalFile(URI fileUri) throws FileNotFoundException {
189196

190197
@Override
191198
public void run(SourceContext<Row> ctx) throws Exception {
192-
int fromLine = 1;
199+
AtomicInteger currentLine = new AtomicInteger(0);
193200
String line;
194201
initMetric();
195202
inputStream = getInputStream(fileUri);
196203
bufferedReader = new BufferedReader(new InputStreamReader(inputStream, charset));
197204

198-
if (deserializationSchema instanceof DTCsvRowDeserializationSchema) {
199-
fromLine = ((DTCsvRowDeserializationSchema) deserializationSchema).getFromLine();
200-
LOG.info("Read from line: " + fromLine);
201-
}
202-
203205
while (running.get()) {
204206
line = bufferedReader.readLine();
205207
if (line == null) {
@@ -208,13 +210,12 @@ public void run(SourceContext<Row> ctx) throws Exception {
208210
bufferedReader.close();
209211
break;
210212
} else {
211-
numInRecord.inc();
212-
213-
if (numInRecord.getCount() < fromLine) {
213+
if (currentLine.incrementAndGet() < fromLine) {
214214
continue;
215215
}
216216

217217
try {
218+
numInRecord.inc();
218219
Row row = deserializationSchema.deserialize(line.getBytes());
219220
if (row == null) {
220221
throw new IOException("Deserialized row is null");
@@ -224,7 +225,7 @@ public void run(SourceContext<Row> ctx) throws Exception {
224225
} catch (IOException e) {
225226
if (errorCounter.getCount() % 1000 == 0) {
226227
LOG.error("Deserialize error! Record: " + line);
227-
LOG.error("Cause: ", e);
228+
LOG.error("Cause: " + ExceptionTrace.traceOriginalCause(e));
228229
}
229230
errorCounter.inc();
230231
}

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/table/CsvSourceTableInfo.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ public class CsvSourceTableInfo extends FileSourceTableInfo {
5151

5252
private Character escapeCharacter;
5353

54-
private int fromLine;
55-
5654
public Character getFieldDelimiter() {
5755
return fieldDelimiter;
5856
}
@@ -101,14 +99,6 @@ public void setEscapeCharacter(Character escapeCharacter) {
10199
this.escapeCharacter = escapeCharacter;
102100
}
103101

104-
public int getFromLine() {
105-
return fromLine;
106-
}
107-
108-
public void setFromLine(int fromLine) {
109-
this.fromLine = fromLine;
110-
}
111-
112102
public void buildDeserializationSchema() {
113103
DTCsvRowDeserializationSchema dtCsvRowDeserializationSchema = new DTCsvRowDeserializationSchema
114104
.Builder()
@@ -119,7 +109,6 @@ public void buildDeserializationSchema() {
119109
.setArrayElementDelimiter(this.getArrayElementDelimiter())
120110
.setEscapeCharacter(this.getEscapeCharacter())
121111
.setQuoteCharacter(this.getQuoteCharacter())
122-
.setFromLine(this.getFromLine())
123112
.build();
124113
this.setDeserializationSchema(dtCsvRowDeserializationSchema);
125114
}

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/table/FileSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public AbstractTableInfo getTableInfo(String tableName,
4242
tableInfo.setFileName(MathUtil.getString(props.get(FileSourceConstant.FILE_NAME_KEY.toLowerCase())));
4343
tableInfo.setFilePath(MathUtil.getString(props.getOrDefault(FileSourceConstant.FILE_PATH_KEY.toLowerCase(), FileSourceConstant.DEFAULT_PATH)));
4444
tableInfo.setCharsetName(MathUtil.getString(props.getOrDefault(FileSourceConstant.CHARSET_NAME_KEY.toLowerCase(), FileSourceConstant.DEFAULT_CHARSET)));
45+
tableInfo.setFromLine(MathUtil.getIntegerVal(props.getOrDefault(FileSourceConstant.FROM_LINE_KEY.toLowerCase(), 1)));
4546

4647
tableInfo.check();
4748

@@ -73,7 +74,6 @@ private FileSourceTableInfo switchTableType(String tableName,
7374
tableInfo.setEscapeCharacter(escapeChar);
7475
}
7576

76-
tableInfo.setFromLine(MathUtil.getIntegerVal(props.getOrDefault(FileSourceConstant.FROM_LINE_KEY.toLowerCase(), 1)));
7777
tableInfo.buildDeserializationSchema();
7878
return tableInfo;
7979
}

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/table/FileSourceTableInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class FileSourceTableInfo extends AbstractSourceTableInfo {
5858

5959
private TypeInformation<Row> typeInformation;
6060

61+
private int fromLine;
62+
6163
/*-------------------------------------------------------------*/
6264

6365
public String getFileName() {
@@ -100,6 +102,14 @@ public void setDeserializationSchema(DeserializationSchema<Row> deserializationS
100102
this.deserializationSchema = deserializationSchema;
101103
}
102104

105+
public int getFromLine() {
106+
return fromLine;
107+
}
108+
109+
public void setFromLine(int fromLine) {
110+
this.fromLine = fromLine;
111+
}
112+
103113
@SuppressWarnings("unchecked")
104114
public TypeInformation<Row> buildRowTypeInfo() {
105115
String[] types = getFieldTypes();
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.file.throwable;
20+
21+
import org.apache.flink.util.FlinkRuntimeException;
22+
23+
/**
24+
* @author tiezhu
25+
* @since 2021/5/6 3:39 下午
26+
*/
27+
28+
public class LengthMismatchException extends FlinkRuntimeException {
29+
public LengthMismatchException(String message) {
30+
super(message);
31+
}
32+
}

0 commit comments

Comments
 (0)