Skip to content

Commit 7480358

Browse files
committed
row2baserow
1 parent c00bd68 commit 7480358

7 files changed

Lines changed: 61 additions & 72 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919

2020
package com.dtstack.flink.sql.side;
2121

22+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2223
import com.dtstack.flink.sql.util.RowDataComplete;
24+
import org.apache.calcite.sql.JoinType;
2325
import org.apache.flink.api.common.functions.RichFlatMapFunction;
24-
import org.apache.flink.api.java.tuple.Tuple2;
2526
import org.apache.flink.configuration.Configuration;
2627
import org.apache.flink.table.dataformat.BaseRow;
2728
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -30,9 +31,6 @@
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

33-
import com.dtstack.flink.sql.factory.DTThreadFactory;
34-
import org.apache.calcite.sql.JoinType;
35-
3634
import java.sql.SQLException;
3735
import java.sql.Timestamp;
3836
import java.time.LocalDateTime;
@@ -48,7 +46,7 @@
4846
* @author xuchao
4947
*/
5048

51-
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean, Row>, Tuple2<Boolean, BaseRow>> implements ISideReqRow {
49+
public abstract class BaseAllReqRow extends RichFlatMapFunction<Row, BaseRow> implements ISideReqRow {
5250

5351
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
5452

@@ -89,12 +87,12 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
8987
return obj;
9088
}
9189

92-
protected void sendOutputRow(Tuple2<Boolean, Row> value, Object sideInput, Collector<Tuple2<Boolean, BaseRow>> out) {
90+
protected void sendOutputRow(Row value, Object sideInput, Collector<BaseRow> out) {
9391
if (sideInput == null && sideInfo.getJoinType() != JoinType.LEFT) {
9492
return;
9593
}
96-
Row row = fillData(value.f1, sideInput);
97-
RowDataComplete.collectTupleRow(out, Tuple2.of(value.f0, row));
94+
Row row = fillData(value, sideInput);
95+
RowDataComplete.collectRow(out, row);
9896
}
9997

10098
@Override

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
* @author xuchao
6565
*/
6666

67-
public abstract class BaseAsyncReqRow extends RichAsyncFunction<Tuple2<Boolean, Row>, Tuple2<Boolean, BaseRow>> implements ISideReqRow {
67+
public abstract class BaseAsyncReqRow extends RichAsyncFunction<Row, BaseRow> implements ISideReqRow {
6868
private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncReqRow.class);
6969
private static final long serialVersionUID = 2098635244857937717L;
7070
private RuntimeContext runtimeContext;
@@ -135,12 +135,12 @@ protected boolean openCache() {
135135
return sideInfo.getSideCache() != null;
136136
}
137137

138-
protected void dealMissKey(Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) {
138+
protected void dealMissKey(Row input, ResultFuture<BaseRow> resultFuture) {
139139
if (sideInfo.getJoinType() == JoinType.LEFT) {
140140
//Reserved left table data
141141
try {
142-
Row row = fillData(input.f1, null);
143-
RowDataComplete.completeTupleRows(resultFuture, Collections.singleton(new Tuple2<>(input.f0, row)));
142+
Row row = fillData(input, null);
143+
RowDataComplete.completeRow(resultFuture, row);
144144
} catch (Exception e) {
145145
dealFillDataError(input, resultFuture, e);
146146
}
@@ -156,7 +156,7 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
156156
}
157157

158158
@Override
159-
public void timeout(Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception {
159+
public void timeout(Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
160160

161161
if (timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0) {
162162
LOG.info("Async function call has timed out. input:{}, timeOutNum:{}", input.toString(), timeOutNum);
@@ -173,14 +173,14 @@ public void timeout(Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, Bas
173173
resultFuture.complete(null);
174174
}
175175

176-
protected void preInvoke(Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture)
176+
protected void preInvoke(Row input, ResultFuture<BaseRow> resultFuture)
177177
throws InvocationTargetException, IllegalAccessException {
178178
registerTimerAndAddToHandler(input, resultFuture);
179179
}
180180

181181
@Override
182-
public void asyncInvoke(Tuple2<Boolean, Row> row, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception {
183-
Tuple2<Boolean, Row> input = Tuple2.of(row.f0, Row.copy(row.f1));
182+
public void asyncInvoke(Row row, ResultFuture<BaseRow> resultFuture) throws Exception {
183+
Row input = Row.copy(row);
184184
preInvoke(input, resultFuture);
185185
Map<String, Object> inputParams = parseInputParam(input);
186186
if (MapUtils.isEmpty(inputParams)) {
@@ -194,11 +194,11 @@ public void asyncInvoke(Tuple2<Boolean, Row> row, ResultFuture<Tuple2<Boolean, B
194194
handleAsyncInvoke(inputParams, input, resultFuture);
195195
}
196196

197-
private Map<String, Object> parseInputParam(Tuple2<Boolean, Row> input) {
197+
private Map<String, Object> parseInputParam(Row input) {
198198
Map<String, Object> inputParams = Maps.newHashMap();
199199
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
200200
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
201-
Object equalObj = input.f1.getField(conValIndex);
201+
Object equalObj = input.getField(conValIndex);
202202
if (equalObj == null) {
203203
return inputParams;
204204
}
@@ -212,7 +212,7 @@ protected boolean isUseCache(Map<String, Object> inputParams) {
212212
return openCache() && getFromCache(buildCacheKey(inputParams)) != null;
213213
}
214214

215-
private void invokeWithCache(Map<String, Object> inputParams, Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) {
215+
private void invokeWithCache(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) {
216216
if (openCache()) {
217217
CacheObj val = getFromCache(buildCacheKey(inputParams));
218218
if (val != null) {
@@ -221,21 +221,19 @@ private void invokeWithCache(Map<String, Object> inputParams, Tuple2<Boolean, Ro
221221
return;
222222
} else if (ECacheContentType.SingleLine == val.getType()) {
223223
try {
224-
Row row = fillData(input.f1, val.getContent());
225-
BaseRow baseRow = RowDataConvert.convertToBaseRow(row);
226-
resultFuture.complete(Collections.singleton(Tuple2.of(input.f0, baseRow)));
224+
Row row = fillData(input, val.getContent());
225+
RowDataComplete.completeRow(resultFuture, row);
227226
} catch (Exception e) {
228227
dealFillDataError(input, resultFuture, e);
229228
}
230229
} else if (ECacheContentType.MultiLine == val.getType()) {
231230
try {
232-
List<Tuple2<Boolean, BaseRow>> rowList = Lists.newArrayList();
231+
List<Row> rowList = Lists.newArrayList();
233232
for (Object one : (List) val.getContent()) {
234-
Row row = fillData(input.f1, one);
235-
BaseRow baseRow = RowDataConvert.convertToBaseRow(row);
236-
rowList.add(Tuple2.of(input.f0, baseRow));
233+
Row row = fillData(input, one);
234+
rowList.add(row);
237235
}
238-
resultFuture.complete(rowList);
236+
RowDataComplete.completeRow(resultFuture,rowList);
239237
} catch (Exception e) {
240238
dealFillDataError(input, resultFuture, e);
241239
}
@@ -247,22 +245,22 @@ private void invokeWithCache(Map<String, Object> inputParams, Tuple2<Boolean, Ro
247245
}
248246
}
249247

250-
public abstract void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception;
248+
public abstract void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception;
251249

252250
public abstract String buildCacheKey(Map<String, Object> inputParams);
253251

254252
private ProcessingTimeService getProcessingTimeService() {
255253
return ((StreamingRuntimeContext) this.runtimeContext).getProcessingTimeService();
256254
}
257255

258-
protected ScheduledFuture<?> registerTimer(Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) {
256+
protected ScheduledFuture<?> registerTimer(Row input, ResultFuture<BaseRow> resultFuture) {
259257
long timeoutTimestamp = sideInfo.getSideTableInfo().getAsyncTimeout() + getProcessingTimeService().getCurrentProcessingTime();
260258
return getProcessingTimeService().registerTimer(
261259
timeoutTimestamp,
262260
timestamp -> timeout(input, resultFuture));
263261
}
264262

265-
protected void registerTimerAndAddToHandler(Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture)
263+
protected void registerTimerAndAddToHandler(Row input, ResultFuture<BaseRow> resultFuture)
266264
throws InvocationTargetException, IllegalAccessException {
267265
ScheduledFuture<?> timeFuture = registerTimer(input, resultFuture);
268266
// resultFuture 是ResultHandler 的实例
@@ -272,7 +270,7 @@ protected void registerTimerAndAddToHandler(Tuple2<Boolean, Row> input, ResultFu
272270
}
273271

274272

275-
protected void dealFillDataError(Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture, Throwable e) {
273+
protected void dealFillDataError(Row input, ResultFuture<BaseRow> resultFuture, Throwable e) {
276274
parseErrorRecords.inc();
277275
if (parseErrorRecords.getCount() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)) {
278276
LOG.info("dealFillDataError", e);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,10 @@ private void joinFun(Object pollObj,
428428

429429
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getFieldTypes(), targetTable.getSchema().getFieldNames());
430430

431-
DataStream<Tuple2<Boolean, Row>> adaptStream = tableEnv.toRetractStream(targetTable, Row.class);
431+
DataStream adaptStream = tableEnv.toRetractStream(targetTable, Row.class)
432+
.filter(f -> f.f0)
433+
.map(f -> f.f1)
434+
.returns(Row.class);
432435

433436
//join side table before keyby ===> Reducing the size of each dimension table cache of async
434437
if (sideTableInfo.isPartitionedJoin()) {
@@ -438,7 +441,7 @@ private void joinFun(Object pollObj,
438441
adaptStream = adaptStream.keyBy(new TupleKeySelector(keyIndex, projectedTypeInfo(keyIndex, targetTable.getSchema())));
439442
}
440443

441-
DataStream<Tuple2<Boolean, Row>> dsOut = null;
444+
DataStream dsOut = null;
442445
if(ECacheType.ALL.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
443446
dsOut = SideWithAllCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo, pluginLoadMode);
444447
}else{
@@ -447,8 +450,7 @@ private void joinFun(Object pollObj,
447450

448451
BaseRowTypeInfo sideOutTypeInfo = buildOutRowTypeInfo(sideJoinFieldInfo, mappingTable);
449452

450-
TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(Types.BOOLEAN, sideOutTypeInfo);
451-
dsOut.getTransformation().setOutputType(tupleTypeInfo);
453+
dsOut.getTransformation().setOutputType(sideOutTypeInfo);
452454

453455
String targetTableName = joinInfo.getNewTableName();
454456
String targetTableAlias = joinInfo.getNewTableAlias();

core/src/main/java/com/dtstack/flink/sql/side/TupleKeySelector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* Company: www.dtstack.com
3030
* @author maqi
3131
*/
32-
public class TupleKeySelector implements ResultTypeQueryable<Row>, KeySelector<Tuple2<Boolean, Row>, Row> {
32+
public class TupleKeySelector implements ResultTypeQueryable<Row>, KeySelector<Row, Row> {
3333

3434
private int[] keyFields;
3535
private TypeInformation<Row> returnType;
@@ -40,8 +40,8 @@ public TupleKeySelector(int[] keyFields, TypeInformation<Row> returnType) {
4040
}
4141

4242
@Override
43-
public Row getKey(Tuple2<Boolean, Row> value) throws Exception {
44-
return Row.project(value.f1, keyFields);
43+
public Row getKey(Row value) throws Exception {
44+
return Row.project(value, keyFields);
4545
}
4646

4747
@Override

core/src/main/java/com/dtstack/flink/sql/util/RowDataComplete.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static void completeRow(ResultFuture<BaseRow> resultFuture, List<Row> row
5757
resultFuture.complete(baseRowList);
5858
}
5959

60-
public static void completeTupleRows(ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture, Collection<Tuple2<Boolean, Row>> tupleRowList) {
60+
public static void completeTupleRow(ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture, Collection<Tuple2<Boolean, Row>> tupleRowList) {
6161
List<Tuple2<Boolean, BaseRow>> baseRowList = Lists.newArrayList();
6262
for (Tuple2<Boolean, Row> rowTuple : tupleRowList) {
6363
baseRowList.add(new Tuple2<>(rowTuple.f0, RowDataConvert.convertToBaseRow(rowTuple.f1)));

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,29 @@
1818

1919
package com.dtstack.flink.sql.side.rdb.all;
2020

21-
import com.dtstack.flink.sql.util.RowDataComplete;
22-
import com.dtstack.flink.sql.util.RowDataConvert;
23-
import org.apache.flink.api.common.typeinfo.TypeInformation;
24-
import org.apache.flink.api.java.tuple.Tuple2;
25-
import org.apache.flink.configuration.Configuration;
26-
import org.apache.flink.table.dataformat.BaseRow;
27-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
28-
import org.apache.flink.types.Row;
29-
import org.apache.flink.util.Collector;
30-
31-
3221
import com.dtstack.flink.sql.side.BaseAllReqRow;
3322
import com.dtstack.flink.sql.side.BaseSideInfo;
3423
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
3524
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
25+
import com.dtstack.flink.sql.util.RowDataComplete;
26+
import com.dtstack.flink.sql.util.RowDataConvert;
3627
import com.google.common.collect.Lists;
3728
import com.google.common.collect.Maps;
3829
import org.apache.calcite.sql.JoinType;
3930
import org.apache.commons.collections.CollectionUtils;
4031
import org.apache.commons.lang3.StringUtils;
32+
import org.apache.flink.api.common.typeinfo.TypeInformation;
33+
import org.apache.flink.configuration.Configuration;
34+
import org.apache.flink.table.dataformat.BaseRow;
35+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
36+
import org.apache.flink.types.Row;
4137
import org.apache.flink.util.Collector;
4238
import org.slf4j.Logger;
4339
import org.slf4j.LoggerFactory;
4440

45-
import java.sql.Connection;
46-
import java.sql.ResultSet;
47-
import java.sql.SQLException;
48-
import java.sql.Statement;
49-
import java.sql.Timestamp;
50-
import java.util.ArrayList;
41+
import java.sql.*;
5142
import java.time.LocalDateTime;
43+
import java.util.ArrayList;
5244
import java.util.Calendar;
5345
import java.util.List;
5446
import java.util.Map;
@@ -107,16 +99,16 @@ protected void reloadCache() {
10799
}
108100

109101
@Override
110-
public void flatMap(Tuple2<Boolean, Row> value, Collector<Tuple2<Boolean, BaseRow>> out) throws Exception {
102+
public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
111103
List<Integer> equalValIndex = sideInfo.getEqualValIndex();
112104
ArrayList<Object> inputParams = equalValIndex.stream()
113-
.map(value.f1::getField)
105+
.map(value::getField)
114106
.filter(object -> null != object)
115107
.collect(Collectors.toCollection(ArrayList::new));
116108

117109
if (inputParams.size() != equalValIndex.size() && sideInfo.getJoinType() == JoinType.LEFT) {
118-
Row row = fillData(value.f1, null);
119-
RowDataComplete.collectTupleRow(out, Tuple2.of(value.f0, row));
110+
Row row = fillData(value, null);
111+
RowDataComplete.collectRow(out, row);
120112
return;
121113
}
122114

@@ -126,11 +118,11 @@ public void flatMap(Tuple2<Boolean, Row> value, Collector<Tuple2<Boolean, BaseRo
126118

127119
List<Map<String, Object>> cacheList = cacheRef.get().get(cacheKey);
128120
if (CollectionUtils.isEmpty(cacheList) && sideInfo.getJoinType() == JoinType.LEFT) {
129-
out.collect(Tuple2.of(value.f0, RowDataConvert.convertToBaseRow(fillData(value.f1, null))));
121+
Row row = fillData(value, null);
122+
RowDataComplete.collectRow(out, row);
130123
} else if (!CollectionUtils.isEmpty(cacheList)) {
131-
cacheList.stream().forEach(one -> out.collect(Tuple2.of(value.f0, RowDataConvert.convertToBaseRow(fillData(value.f1, one)))));
124+
cacheList.stream().forEach(one -> out.collect(RowDataConvert.convertToBaseRow(fillData(value, one))));
132125
}
133-
134126
}
135127

136128
@Override

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,12 @@ protected void init(BaseSideInfo sideInfo) {
111111
}
112112

113113
@Override
114-
protected void preInvoke(Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) {
114+
protected void preInvoke(Row input, ResultFuture<BaseRow> resultFuture) {
115115

116116
}
117117

118118
@Override
119-
public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception {
119+
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
120120

121121
AtomicLong networkLogCounter = new AtomicLong(0L);
122122
while (!connectionStatus.get()) {//network is unhealth
@@ -129,7 +129,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean, R
129129
executor.execute(() -> connectWithRetry(params, input, resultFuture, rdbSqlClient));
130130
}
131131

132-
private void connectWithRetry(Map<String, Object> inputParams, Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture, SQLClient rdbSqlClient) {
132+
private void connectWithRetry(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture, SQLClient rdbSqlClient) {
133133
AtomicLong failCounter = new AtomicLong(0);
134134
AtomicBoolean finishFlag = new AtomicBoolean(false);
135135
while (!finishFlag.get()) {
@@ -262,7 +262,7 @@ public void setRdbSqlClient(SQLClient rdbSqlClient) {
262262
this.rdbSqlClient = rdbSqlClient;
263263
}
264264

265-
private void handleQuery(SQLConnection connection, Map<String, Object> inputParams, Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) {
265+
private void handleQuery(SQLConnection connection, Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) {
266266
String key = buildCacheKey(inputParams);
267267
JsonArray params = new JsonArray(Lists.newArrayList(inputParams.values()));
268268
connection.queryWithParams(sideInfo.getSqlCondition(), params, rs -> {
@@ -275,21 +275,20 @@ private void handleQuery(SQLConnection connection, Map<String, Object> inputPara
275275

276276
int resultSize = rs.result().getResults().size();
277277
if (resultSize > 0) {
278-
List<Tuple2<Boolean, Row>> rowList = Lists.newArrayList();
278+
List<Row> rowList = Lists.newArrayList();
279279

280280
for (JsonArray line : rs.result().getResults()) {
281-
Row row = fillData(input.f1, line);
281+
Row row = fillData(input, line);
282282
if (openCache()) {
283283
cacheContent.add(line);
284284
}
285-
rowList.add(new Tuple2<Boolean, Row>(input.f0, row));
285+
rowList.add(row);
286286
}
287287

288288
if (openCache()) {
289289
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
290290
}
291-
292-
RowDataComplete.completeTupleRows(resultFuture, rowList);
291+
RowDataComplete.completeRow(resultFuture, rowList);
293292
} else {
294293
dealMissKey(input, resultFuture);
295294
if (openCache()) {

0 commit comments

Comments
 (0)