Skip to content

Commit 17e714b

Browse files
committed
[opt-36803][file] Adjust fileSource metric logic.
1 parent 74eaf32 commit 17e714b

1 file changed

Lines changed: 24 additions & 21 deletions

File tree

  • file/file-source/src/main/java/com/dtstack/flink/sql/source/file

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/FileSource.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -196,32 +196,34 @@ private InputStream fromLocalFile(URI fileUri) throws FileNotFoundException {
196196

197197
@Override
198198
public void run(SourceContext<Row> ctx) throws Exception {
199-
AtomicInteger currentLine = new AtomicInteger(0);
200-
String line;
201199
initMetric();
200+
AtomicInteger currentLine = new AtomicInteger(0);
201+
String line = "";
202202
inputStream = getInputStream(fileUri);
203203
bufferedReader = new BufferedReader(new InputStreamReader(inputStream, charset));
204204

205-
while (running.get()) {
206-
line = bufferedReader.readLine();
207-
if (line == null) {
208-
running.compareAndSet(true, false);
209-
inputStream.close();
210-
bufferedReader.close();
211-
break;
212-
} else {
213-
if (currentLine.incrementAndGet() < fromLine) {
214-
continue;
215-
}
216-
205+
try {
206+
while (running.get()) {
217207
try {
218-
numInRecord.inc();
219-
Row row = deserializationSchema.deserialize(line.getBytes());
220-
if (row == null) {
221-
throw new IOException("Deserialized row is null");
208+
line = bufferedReader.readLine();
209+
if (line == null) {
210+
running.compareAndSet(true, false);
211+
inputStream.close();
212+
bufferedReader.close();
213+
break;
214+
} else {
215+
if (currentLine.incrementAndGet() < fromLine) {
216+
continue;
217+
}
218+
219+
numInRecord.inc();
220+
Row row = deserializationSchema.deserialize(line.getBytes());
221+
if (row == null) {
222+
throw new IOException("Deserialized row is null");
223+
}
224+
ctx.collect(row);
225+
numInResolveRecord.inc();
222226
}
223-
ctx.collect(row);
224-
numInResolveRecord.inc();
225227
} catch (IOException e) {
226228
if (errorCounter.getCount() % 1000 == 0) {
227229
LOG.error("Deserialize error! Record: " + line);
@@ -230,8 +232,9 @@ public void run(SourceContext<Row> ctx) throws Exception {
230232
errorCounter.inc();
231233
}
232234
}
235+
} finally {
236+
ThreadUtil.sleepSeconds(METRIC_WAIT_TIME);
233237
}
234-
ThreadUtil.sleepSeconds(METRIC_WAIT_TIME);
235238
}
236239

237240
@Override

0 commit comments

Comments
 (0)