Skip to content

Commit 6a76b57

Browse files
committed
[opt-1422][file] 调整file source 部分代码
1 parent 93adac1 commit 6a76b57

2 files changed

Lines changed: 33 additions & 3 deletions

File tree

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/FileSource.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.source.file;
220

321
import com.dtstack.flink.sql.metric.MetricConstant;
@@ -57,6 +75,8 @@ public class FileSource extends AbstractRichFunction implements IStreamSourceGen
5775

5876
private InputStream inputStream;
5977

78+
private BufferedReader bufferedReader;
79+
6080
private String charset;
6181

6282
protected transient Counter errorCounter;
@@ -173,7 +193,7 @@ public void run(SourceContext<Row> ctx) throws Exception {
173193
String line;
174194
initMetric();
175195
inputStream = getInputStream(fileUri);
176-
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, charset));
196+
bufferedReader = new BufferedReader(new InputStreamReader(inputStream, charset));
177197

178198
if (deserializationSchema instanceof DTCsvRowDeserializationSchema) {
179199
fromLine = ((DTCsvRowDeserializationSchema) deserializationSchema).getFromLine();
@@ -185,6 +205,7 @@ public void run(SourceContext<Row> ctx) throws Exception {
185205
if (line == null) {
186206
running.compareAndSet(true, false);
187207
inputStream.close();
208+
bufferedReader.close();
188209
break;
189210
} else {
190211
numInRecord.inc();
@@ -220,7 +241,15 @@ public void cancel() {
220241
try {
221242
inputStream.close();
222243
} catch (IOException ioException) {
223-
LOG.error("File input stream close error!");
244+
LOG.error("File input stream close error!", ioException);
245+
}
246+
}
247+
248+
if (bufferedReader != null) {
249+
try {
250+
bufferedReader.close();
251+
} catch (IOException ioException) {
252+
LOG.error("File buffer reader close error!", ioException);
224253
}
225254
}
226255
}

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/table/FileSourceParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.table.AbstractTableInfo;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525

26+
import java.util.Locale;
2627
import java.util.Map;
2728

2829
/**
@@ -50,7 +51,7 @@ public AbstractTableInfo getTableInfo(String tableName,
5051
private FileSourceTableInfo switchTableType(String tableName,
5152
String fieldsInfo,
5253
Map<String, Object> props) {
53-
String format = MathUtil.getString(props.getOrDefault(FileSourceConstant.FORMAT_KEY.toLowerCase(), FileSourceConstant.DEFAULT_FILE_FORMAT));
54+
String format = MathUtil.getString(props.getOrDefault(FileSourceConstant.FORMAT_KEY.toLowerCase(), FileSourceConstant.DEFAULT_FILE_FORMAT)).toLowerCase(Locale.ROOT);
5455
switch (format) {
5556
case "csv": {
5657
CsvSourceTableInfo tableInfo = new CsvSourceTableInfo();

0 commit comments

Comments
 (0)