Skip to content

Commit c98e625

Browse files
committed
[opt-36803][file] Adjust dirty data processing logic.
1 parent 2bac9c5 commit c98e625

3 files changed

Lines changed: 38 additions & 3 deletions

File tree

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/DTCsvRowDeserializationSchema.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.file;
2020

21+
import com.dtstack.flink.sql.source.file.throwable.LengthMismatchException;
2122
import org.apache.flink.annotation.PublicEvolving;
2223
import org.apache.flink.api.common.serialization.DeserializationSchema;
2324
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -371,8 +372,9 @@ private static RuntimeConverter createByteArrayRuntimeConverter() {
371372

372373
private static void validateArity(int expected, int actual) {
373374
if (expected != actual) {
374-
LOG.warn("Row length mismatch. " + expected +
375-
" fields expected but was " + actual + ".");
375+
throw new LengthMismatchException(
376+
"Row length mismatch. " + expected +
377+
" fields expected but was " + actual + ".");
376378
}
377379
}
378380
}

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/FileSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.file;
2020

21+
import com.dtstack.flink.sql.exception.ExceptionTrace;
2122
import com.dtstack.flink.sql.metric.MetricConstant;
2223
import com.dtstack.flink.sql.source.IStreamSourceGener;
2324
import com.dtstack.flink.sql.source.file.table.FileSourceTableInfo;
@@ -224,7 +225,7 @@ public void run(SourceContext<Row> ctx) throws Exception {
224225
} catch (IOException e) {
225226
if (errorCounter.getCount() % 1000 == 0) {
226227
LOG.error("Deserialize error! Record: " + line);
227-
LOG.error("Cause: ", e);
228+
LOG.error("Cause: " + ExceptionTrace.traceOriginalCause(e));
228229
}
229230
errorCounter.inc();
230231
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.file.throwable;
20+
21+
import org.apache.flink.util.FlinkRuntimeException;
22+
23+
/**
24+
* @author tiezhu
25+
* @since 2021/5/6 3:39 下午
26+
*/
27+
28+
public class LengthMismatchException extends FlinkRuntimeException {
29+
public LengthMismatchException(String message) {
30+
super(message);
31+
}
32+
}

0 commit comments

Comments
 (0)