File tree Expand file tree Collapse file tree
file/file-source/src/main/java/com/dtstack/flink/sql/source/file Expand file tree Collapse file tree Original file line number Diff line number Diff line change 5151import java .net .URI ;
5252import java .util .Locale ;
5353import java .util .concurrent .atomic .AtomicBoolean ;
54+ import java .util .concurrent .atomic .AtomicInteger ;
5455
5556/**
5657 * @author tiezhu
@@ -194,6 +195,7 @@ private InputStream fromLocalFile(URI fileUri) throws FileNotFoundException {
194195
195196 @ Override
196197 public void run (SourceContext <Row > ctx ) throws Exception {
198+ AtomicInteger currentLine = new AtomicInteger (0 );
197199 String line ;
198200 initMetric ();
199201 inputStream = getInputStream (fileUri );
@@ -207,13 +209,12 @@ public void run(SourceContext<Row> ctx) throws Exception {
207209 bufferedReader .close ();
208210 break ;
209211 } else {
210- numInRecord .inc ();
211-
212- if (numInRecord .getCount () < fromLine ) {
212+ if (currentLine .incrementAndGet () < fromLine ) {
213213 continue ;
214214 }
215215
216216 try {
217+ numInRecord .inc ();
217218 Row row = deserializationSchema .deserialize (line .getBytes ());
218219 if (row == null ) {
219220 throw new IOException ("Deserialized row is null" );
You can’t perform that action at this time.
0 commit comments