Skip to content

Commit 6e50d11

Browse files
committed
[opt-36803][file] optimize fileSource code and move parameter "fromLine" to <FileSourceTableInfo>
1 parent 92e5732 commit 6e50d11

5 files changed

Lines changed: 23 additions & 40 deletions

File tree

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/DTCsvRowDeserializationSchema.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ public class DTCsvRowDeserializationSchema implements DeserializationSchema<Row>
6868
*/
6969
private ObjectReader objectReader;
7070

71-
private int fromLine;
72-
7371
/**
7472
* 字段值的分割符,默认为','
7573
*/
@@ -154,10 +152,6 @@ private CsvSchema initCsvSchema(TypeInformation<Row> typeInfo) {
154152
// Setter
155153
// --------------------------------------------------------------------------------------------
156154

157-
public void setFromLine(int fromLine) {
158-
this.fromLine = fromLine;
159-
}
160-
161155
public void setFieldDelimiter(Character fieldDelimiter) {
162156
this.fieldDelimiter = fieldDelimiter;
163157
}
@@ -186,10 +180,6 @@ public void setTypeInfo(TypeInformation<Row> typeInfo) {
186180
this.typeInfo = typeInfo;
187181
}
188182

189-
public int getFromLine() {
190-
return fromLine;
191-
}
192-
193183
// --------------------------------------------------------------------------------------------
194184
// Builder
195185
// --------------------------------------------------------------------------------------------
@@ -232,11 +222,6 @@ public Builder setEscapeCharacter(Character escapeCharacter) {
232222
return this;
233223
}
234224

235-
public Builder setFromLine(int fromLine) {
236-
deserializationSchema.setFromLine(fromLine);
237-
return this;
238-
}
239-
240225
public Builder setTypeInfo(TypeInformation<Row> typeInfo) {
241226
deserializationSchema.setTypeInfo(typeInfo);
242227
return this;

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/FileSource.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ public class FileSource extends AbstractRichFunction implements IStreamSourceGen
7979

8080
private String charset;
8181

82+
private int fromLine;
83+
8284
protected transient Counter errorCounter;
8385

8486
/**
@@ -98,9 +100,12 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo,
98100
FileSource fileSource = new FileSource();
99101
FileSourceTableInfo tableInfo = (FileSourceTableInfo) sourceTableInfo;
100102

101-
DataStreamSource<?> source = fileSource.initDataStream(tableInfo, env);
103+
fileSource.initSource(tableInfo);
104+
102105
String fields = StringUtils.join(tableInfo.getFields(), ",");
103106

107+
DataStreamSource<Row> source = env.addSource(fileSource, tableInfo.getOperatorName(), tableInfo.getTypeInformation());
108+
104109
return tableEnv.fromDataStream(source, fields);
105110
}
106111

@@ -115,15 +120,15 @@ public void initMetric() {
115120
numInResolveRecord = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_RESOVED_IN_COUNTER);
116121
}
117122

118-
public DataStreamSource<?> initDataStream(FileSourceTableInfo tableInfo,
119-
StreamExecutionEnvironment env) {
123+
public void initSource(FileSourceTableInfo tableInfo) {
120124
deserializationSchema = tableInfo.getDeserializationSchema();
121125
fileUri = URI.create(tableInfo.getFilePath() + SP + tableInfo.getFileName());
126+
122127
charset = tableInfo.getCharsetName();
123-
return env.addSource(
124-
this,
125-
tableInfo.getOperatorName(),
126-
tableInfo.buildRowTypeInfo());
128+
LOG.info("File charset: " + charset);
129+
130+
fromLine = tableInfo.getFromLine();
131+
LOG.info("Read from line: " + fromLine);
127132
}
128133

129134
/**
@@ -189,17 +194,11 @@ private InputStream fromLocalFile(URI fileUri) throws FileNotFoundException {
189194

190195
@Override
191196
public void run(SourceContext<Row> ctx) throws Exception {
192-
int fromLine = 1;
193197
String line;
194198
initMetric();
195199
inputStream = getInputStream(fileUri);
196200
bufferedReader = new BufferedReader(new InputStreamReader(inputStream, charset));
197201

198-
if (deserializationSchema instanceof DTCsvRowDeserializationSchema) {
199-
fromLine = ((DTCsvRowDeserializationSchema) deserializationSchema).getFromLine();
200-
LOG.info("Read from line: " + fromLine);
201-
}
202-
203202
while (running.get()) {
204203
line = bufferedReader.readLine();
205204
if (line == null) {

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/table/CsvSourceTableInfo.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ public class CsvSourceTableInfo extends FileSourceTableInfo {
5151

5252
private Character escapeCharacter;
5353

54-
private int fromLine;
55-
5654
public Character getFieldDelimiter() {
5755
return fieldDelimiter;
5856
}
@@ -101,14 +99,6 @@ public void setEscapeCharacter(Character escapeCharacter) {
10199
this.escapeCharacter = escapeCharacter;
102100
}
103101

104-
public int getFromLine() {
105-
return fromLine;
106-
}
107-
108-
public void setFromLine(int fromLine) {
109-
this.fromLine = fromLine;
110-
}
111-
112102
public void buildDeserializationSchema() {
113103
DTCsvRowDeserializationSchema dtCsvRowDeserializationSchema = new DTCsvRowDeserializationSchema
114104
.Builder()
@@ -119,7 +109,6 @@ public void buildDeserializationSchema() {
119109
.setArrayElementDelimiter(this.getArrayElementDelimiter())
120110
.setEscapeCharacter(this.getEscapeCharacter())
121111
.setQuoteCharacter(this.getQuoteCharacter())
122-
.setFromLine(this.getFromLine())
123112
.build();
124113
this.setDeserializationSchema(dtCsvRowDeserializationSchema);
125114
}

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/table/FileSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public AbstractTableInfo getTableInfo(String tableName,
4242
tableInfo.setFileName(MathUtil.getString(props.get(FileSourceConstant.FILE_NAME_KEY.toLowerCase())));
4343
tableInfo.setFilePath(MathUtil.getString(props.getOrDefault(FileSourceConstant.FILE_PATH_KEY.toLowerCase(), FileSourceConstant.DEFAULT_PATH)));
4444
tableInfo.setCharsetName(MathUtil.getString(props.getOrDefault(FileSourceConstant.CHARSET_NAME_KEY.toLowerCase(), FileSourceConstant.DEFAULT_CHARSET)));
45+
tableInfo.setFromLine(MathUtil.getIntegerVal(props.getOrDefault(FileSourceConstant.FROM_LINE_KEY.toLowerCase(), 1)));
4546

4647
tableInfo.check();
4748

@@ -73,7 +74,6 @@ private FileSourceTableInfo switchTableType(String tableName,
7374
tableInfo.setEscapeCharacter(escapeChar);
7475
}
7576

76-
tableInfo.setFromLine(MathUtil.getIntegerVal(props.getOrDefault(FileSourceConstant.FROM_LINE_KEY.toLowerCase(), 1)));
7777
tableInfo.buildDeserializationSchema();
7878
return tableInfo;
7979
}

file/file-source/src/main/java/com/dtstack/flink/sql/source/file/table/FileSourceTableInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class FileSourceTableInfo extends AbstractSourceTableInfo {
5858

5959
private TypeInformation<Row> typeInformation;
6060

61+
private int fromLine;
62+
6163
/*-------------------------------------------------------------*/
6264

6365
public String getFileName() {
@@ -100,6 +102,14 @@ public void setDeserializationSchema(DeserializationSchema<Row> deserializationS
100102
this.deserializationSchema = deserializationSchema;
101103
}
102104

105+
public int getFromLine() {
106+
return fromLine;
107+
}
108+
109+
public void setFromLine(int fromLine) {
110+
this.fromLine = fromLine;
111+
}
112+
103113
@SuppressWarnings("unchecked")
104114
public TypeInformation<Row> buildRowTypeInfo() {
105115
String[] types = getFieldTypes();

0 commit comments

Comments
 (0)