Skip to content

Commit ae796a0

Browse files
committed
fix redmine 26199
1 parent 622b4b8 commit ae796a0

12 files changed

Lines changed: 296 additions & 149 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import com.google.common.collect.Lists;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
28+
import org.apache.flink.table.types.logical.LogicalType;
29+
import org.apache.flink.table.types.utils.TypeConversions;
2730

2831
import java.io.Serializable;
2932
import java.util.List;
@@ -96,6 +99,17 @@ public RowTypeInfo getRowTypeInfo(){
9699
return new RowTypeInfo(types, fieldNames);
97100
}
98101

102+
public BaseRowTypeInfo getBaseRowTypeInfo(){
103+
String[] fieldNames = getFields();
104+
Class[] fieldClass = getFieldClasses();
105+
LogicalType[] logicalTypes = new LogicalType[fieldClass.length];
106+
for (int i = 0; i < fieldClass.length; i++) {
107+
logicalTypes[i] = TypeConversions.fromLegacyInfoToDataType(TypeInformation.of(fieldClass[i])).getLogicalType();
108+
}
109+
110+
return new BaseRowTypeInfo(logicalTypes, fieldNames);
111+
}
112+
99113
public String getCacheType() {
100114
return cacheType;
101115
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2424
import org.apache.flink.api.java.tuple.Tuple2;
2525
import org.apache.flink.configuration.Configuration;
26+
import org.apache.flink.table.dataformat.BaseRow;
2627
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2728
import org.apache.flink.types.Row;
2829
import org.apache.flink.util.Collector;
@@ -46,7 +47,7 @@
4647
* @author xuchao
4748
*/
4849

49-
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,Row>> implements ISideReqRow {
50+
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,BaseRow>> implements ISideReqRow {
5051

5152
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
5253

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
2727
import com.dtstack.flink.sql.side.cache.CacheObj;
2828
import com.dtstack.flink.sql.side.cache.LRUSideCache;
29+
import com.dtstack.flink.sql.util.RowDataConvert;
2930
import com.dtstack.flink.sql.util.ReflectionUtils;
3031
import com.google.common.collect.Lists;
3132
import com.google.common.collect.Maps;
@@ -37,8 +38,8 @@
3738
import org.apache.flink.metrics.Counter;
3839
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3940
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
41+
import org.apache.flink.table.dataformat.BaseRow;
4042
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
41-
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
4243
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
4344
import org.apache.flink.table.api.DataTypes;
4445
import org.apache.flink.types.Row;
@@ -62,7 +63,7 @@
6263
* @author xuchao
6364
*/
6465

65-
public abstract class BaseAsyncReqRow extends RichAsyncFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,Row>> implements ISideReqRow {
66+
public abstract class BaseAsyncReqRow extends RichAsyncFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,BaseRow>> implements ISideReqRow {
6667
private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncReqRow.class);
6768
private static final long serialVersionUID = 2098635244857937717L;
6869
private RuntimeContext runtimeContext;
@@ -131,12 +132,13 @@ protected boolean openCache(){
131132
return sideInfo.getSideCache() != null;
132133
}
133134

134-
protected void dealMissKey(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture){
135+
protected void dealMissKey(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture){
135136
if(sideInfo.getJoinType() == JoinType.LEFT){
136137
//Reserved left table data
137138
try {
138139
Row row = fillData(input.f1, null);
139-
resultFuture.complete(Collections.singleton(new Tuple2<>(input.f0, row)));
140+
BaseRow baseRow = RowDataConvert.convertToBaseRow(row);
141+
resultFuture.complete(Collections.singleton(new Tuple2<>(input.f0, baseRow)));
140142
} catch (Exception e) {
141143
dealFillDataError(input, resultFuture, e);
142144
}
@@ -152,7 +154,7 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
152154
}
153155

154156
@Override
155-
public void timeout(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception {
157+
public void timeout(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture) throws Exception {
156158

157159
if(timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0){
158160
LOG.info("Async function call has timed out. input:{}, timeOutNum:{}",input.toString(), timeOutNum);
@@ -169,13 +171,13 @@ public void timeout(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>>
169171
resultFuture.complete(null);
170172
}
171173

172-
protected void preInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture)
174+
protected void preInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture)
173175
throws InvocationTargetException, IllegalAccessException {
174176
registerTimerAndAddToHandler(input, resultFuture);
175177
}
176178

177179
@Override
178-
public void asyncInvoke(Tuple2<Boolean,Row> row, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception {
180+
public void asyncInvoke(Tuple2<Boolean,Row> row, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture) throws Exception {
179181
Tuple2<Boolean,Row> input = Tuple2.of(row.f0, Row.copy(row.f1));
180182
preInvoke(input, resultFuture);
181183
Map<String, Object> inputParams = parseInputParam(input);
@@ -208,7 +210,7 @@ protected boolean isUseCache(Map<String, Object> inputParams){
208210
return openCache() && getFromCache(buildCacheKey(inputParams)) != null;
209211
}
210212

211-
private void invokeWithCache(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture){
213+
private void invokeWithCache(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture){
212214
if (openCache()) {
213215
CacheObj val = getFromCache(buildCacheKey(inputParams));
214216
if (val != null) {
@@ -218,16 +220,18 @@ private void invokeWithCache(Map<String, Object> inputParams, Tuple2<Boolean,Row
218220
}else if(ECacheContentType.SingleLine == val.getType()){
219221
try {
220222
Row row = fillData(input.f1, val.getContent());
221-
resultFuture.complete(Collections.singleton(Tuple2.of(input.f0, row)));
223+
BaseRow baseRow = RowDataConvert.convertToBaseRow(row);
224+
resultFuture.complete(Collections.singleton(Tuple2.of(input.f0, baseRow)));
222225
} catch (Exception e) {
223226
dealFillDataError(input, resultFuture, e);
224227
}
225228
} else if (ECacheContentType.MultiLine == val.getType()) {
226229
try {
227-
List<Tuple2<Boolean,Row>> rowList = Lists.newArrayList();
230+
List<Tuple2<Boolean,BaseRow>> rowList = Lists.newArrayList();
228231
for (Object one : (List) val.getContent()) {
229232
Row row = fillData(input.f1, one);
230-
rowList.add(Tuple2.of(input.f0, row));
233+
BaseRow baseRow = RowDataConvert.convertToBaseRow(row);
234+
rowList.add(Tuple2.of(input.f0, baseRow));
231235
}
232236
resultFuture.complete(rowList);
233237
} catch (Exception e) {
@@ -241,22 +245,22 @@ private void invokeWithCache(Map<String, Object> inputParams, Tuple2<Boolean,Row
241245
}
242246
}
243247

244-
public abstract void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception;
248+
public abstract void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture) throws Exception;
245249

246250
public abstract String buildCacheKey(Map<String, Object> inputParams);
247251

248252
private ProcessingTimeService getProcessingTimeService(){
249253
return ((StreamingRuntimeContext)this.runtimeContext).getProcessingTimeService();
250254
}
251255

252-
protected ScheduledFuture<?> registerTimer(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture){
256+
protected ScheduledFuture<?> registerTimer(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture){
253257
long timeoutTimestamp = sideInfo.getSideTableInfo().getAsyncTimeout() + getProcessingTimeService().getCurrentProcessingTime();
254258
return getProcessingTimeService().registerTimer(
255259
timeoutTimestamp,
256260
timestamp -> timeout(input, resultFuture));
257261
}
258262

259-
protected void registerTimerAndAddToHandler(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture)
263+
protected void registerTimerAndAddToHandler(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture)
260264
throws InvocationTargetException, IllegalAccessException {
261265
ScheduledFuture<?> timeFuture = registerTimer(input, resultFuture);
262266
// resultFuture 是ResultHandler 的实例
@@ -266,7 +270,7 @@ protected void registerTimerAndAddToHandler(Tuple2<Boolean,Row> input, ResultFut
266270
}
267271

268272

269-
protected void dealFillDataError(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture, Throwable e) {
273+
protected void dealFillDataError(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture, Throwable e) {
270274
parseErrorRecords.inc();
271275
if(parseErrorRecords.getCount() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){
272276
LOG.info("dealFillDataError", e);

core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.side;
2221

2322
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.table.types.logical.LogicalType;
2424

2525
import java.io.Serializable;
2626
import java.util.Objects;
@@ -29,6 +29,7 @@
2929
* Reason:
3030
* Date: 2018/7/23
3131
* Company: www.dtstack.com
32+
*
3233
* @author xuchao
3334
*/
3435

@@ -42,6 +43,8 @@ public class FieldInfo implements Serializable {
4243

4344
private TypeInformation typeInformation;
4445

46+
private LogicalType logicalType;
47+
4548
public String getTable() {
4649
return table;
4750
}
@@ -66,6 +69,15 @@ public void setTypeInformation(TypeInformation typeInformation) {
6669
this.typeInformation = typeInformation;
6770
}
6871

72+
public LogicalType getLogicalType() {
73+
return logicalType;
74+
}
75+
76+
public void setLogicalType(LogicalType logicalType) {
77+
this.logicalType = logicalType;
78+
}
79+
80+
6981
@Override
7082
public boolean equals(Object o) {
7183
if (this == o) {

core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.side;
2221

@@ -29,10 +28,15 @@
2928
import java.util.List;
3029
import java.util.Map;
3130

31+
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
32+
import org.apache.flink.table.types.logical.LogicalType;
33+
34+
3235
/**
3336
* Reason:
3437
* Date: 2018/7/20
3538
* Company: www.dtstack.com
39+
*
3640
* @author xuchao
3741
*/
3842

@@ -42,42 +46,59 @@ public class JoinScope {
4246

4347
private Map<String, ScopeChild> aliasMap = Maps.newHashMap();
4448

45-
public void addScope(ScopeChild scopeChild){
49+
public void addScope(ScopeChild scopeChild) {
4650
children.add(scopeChild);
4751
aliasMap.put(scopeChild.getAlias(), scopeChild);
4852
}
4953

50-
public ScopeChild getScope(String tableAlias){
54+
public ScopeChild getScope(String tableAlias) {
5155
return aliasMap.get(tableAlias);
5256
}
5357

5458
public List<ScopeChild> getChildren() {
5559
return children;
5660
}
5761

58-
public TypeInformation getFieldType(String tableName, String fieldName){
62+
public TypeInformation getFieldType(String tableName, String fieldName) {
5963
ScopeChild scopeChild = aliasMap.get(tableName);
60-
if(scopeChild == null){
64+
if (scopeChild == null) {
6165
throw new RuntimeException("can't find ");
6266
}
6367

6468
RowTypeInfo rowTypeInfo = scopeChild.getRowTypeInfo();
6569
int index = rowTypeInfo.getFieldIndex(fieldName);
66-
if(index == -1){
70+
if (index == -1) {
6771
throw new RuntimeException("can't find field: " + fieldName);
6872
}
6973

7074
return rowTypeInfo.getTypeAt(index);
7175
}
7276

73-
public static class ScopeChild{
77+
public LogicalType getLogicalType(String tableName, String fieldName) {
78+
ScopeChild scopeChild = aliasMap.get(tableName);
79+
if (scopeChild == null) {
80+
throw new RuntimeException("can't find ");
81+
}
82+
83+
BaseRowTypeInfo rowTypeInfo = scopeChild.getBaseRowTypeInfo();
84+
int index = rowTypeInfo.getFieldIndex(fieldName);
85+
if (index == -1) {
86+
throw new RuntimeException("can't find field: " + fieldName);
87+
}
88+
89+
return rowTypeInfo.getLogicalTypes()[index];
90+
}
91+
92+
public static class ScopeChild {
7493

7594
private String alias;
7695

7796
private String tableName;
7897

7998
private RowTypeInfo rowTypeInfo;
8099

100+
private BaseRowTypeInfo baseRowTypeInfo;
101+
81102
public String getAlias() {
82103
return alias;
83104
}
@@ -101,5 +122,13 @@ public RowTypeInfo getRowTypeInfo() {
101122
public void setRowTypeInfo(RowTypeInfo rowTypeInfo) {
102123
this.rowTypeInfo = rowTypeInfo;
103124
}
125+
126+
public BaseRowTypeInfo getBaseRowTypeInfo() {
127+
return baseRowTypeInfo;
128+
}
129+
130+
public void setBaseRowTypeInfo(BaseRowTypeInfo baseRowTypeInfo) {
131+
this.baseRowTypeInfo = baseRowTypeInfo;
132+
}
104133
}
105134
}

core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.flink.api.common.typeinfo.TypeInformation;
2929
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3030
import com.google.common.collect.Lists;
31+
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
32+
import org.apache.flink.table.types.logical.LogicalType;
3133

3234
import java.util.Iterator;
3335
import java.util.List;
@@ -68,10 +70,12 @@ public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, b
6870
String tableName = identifier.getComponent(0).getSimple();
6971
String fieldName = identifier.getComponent(1).getSimple();
7072
TypeInformation<?> type = scope.getFieldType(tableName, fieldName);
73+
LogicalType logicalType = scope.getLogicalType(tableName, fieldName);
7174
FieldInfo fieldInfo = new FieldInfo();
7275
fieldInfo.setTable(tableName);
7376
fieldInfo.setFieldName(fieldName);
7477
fieldInfo.setTypeInformation(type);
78+
fieldInfo.setLogicalType(logicalType);
7579
fieldInfoList.add(fieldInfo);
7680
} else {
7781
//处理
@@ -89,15 +93,19 @@ public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, b
8993
}
9094

9195
RowTypeInfo field = scopeChild.getRowTypeInfo();
96+
BaseRowTypeInfo baseRowTypeInfo = scopeChild.getBaseRowTypeInfo();
9297
String[] fieldNames = field.getFieldNames();
9398
TypeInformation<?>[] types = field.getFieldTypes();
99+
LogicalType[] logicalTypes = baseRowTypeInfo.getLogicalTypes();
94100
for(int i=0; i< field.getTotalFields(); i++){
95101
String fieldName = fieldNames[i];
96102
TypeInformation<?> type = types[i];
103+
LogicalType logicalType = logicalTypes[i];
97104
FieldInfo fieldInfo = new FieldInfo();
98105
fieldInfo.setTable(tableIdentify.getSimple());
99106
fieldInfo.setFieldName(fieldName);
100107
fieldInfo.setTypeInformation(type);
108+
fieldInfo.setLogicalType(logicalType);
101109
fieldInfoList.add(fieldInfo);
102110
}
103111
break;
@@ -115,21 +123,26 @@ private static List<FieldInfo> getAllField(JoinScope scope){
115123
while(true) {
116124
JoinScope.ScopeChild resolved;
117125
RowTypeInfo field;
126+
BaseRowTypeInfo baseRowTypeInfo;
118127
if(!prefixId.hasNext()) {
119128
return fieldInfoList;
120129
}
121130

122131
resolved = (JoinScope.ScopeChild)prefixId.next();
123132
field = resolved.getRowTypeInfo();
133+
baseRowTypeInfo = resolved.getBaseRowTypeInfo();
124134
String[] fieldNames = field.getFieldNames();
125135
TypeInformation<?>[] types = field.getFieldTypes();
136+
LogicalType[] logicalTypes = baseRowTypeInfo.getLogicalTypes();
126137
for(int i=0; i< field.getTotalFields(); i++){
127138
String fieldName = fieldNames[i];
128139
TypeInformation<?> type = types[i];
140+
LogicalType logicalType = logicalTypes[i];
129141
FieldInfo fieldInfo = new FieldInfo();
130142
fieldInfo.setTable(resolved.getAlias());
131143
fieldInfo.setFieldName(fieldName);
132144
fieldInfo.setTypeInformation(type);
145+
fieldInfo.setLogicalType(logicalType);
133146
fieldInfoList.add(fieldInfo);
134147
}
135148
}

0 commit comments

Comments
 (0)