Skip to content

Commit 9b3dc31

Browse files
committed
[fix] decimal type transform error when table join
1 parent 59e83f8 commit 9b3dc31

1 file changed

Lines changed: 36 additions & 13 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,16 @@
3434
import com.google.common.collect.Lists;
3535
import com.google.common.collect.Maps;
3636
import com.google.common.collect.Sets;
37-
import org.apache.calcite.sql.*;
37+
import org.apache.calcite.sql.SqlBasicCall;
38+
import org.apache.calcite.sql.SqlIdentifier;
39+
import org.apache.calcite.sql.SqlKind;
40+
import org.apache.calcite.sql.SqlNode;
41+
import org.apache.calcite.sql.SqlSelect;
42+
import org.apache.calcite.sql.SqlWithItem;
3843
import org.apache.calcite.sql.parser.SqlParseException;
3944
import org.apache.commons.collections.CollectionUtils;
4045
import org.apache.commons.lang3.StringUtils;
46+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
4147
import org.apache.flink.api.common.typeinfo.TypeInformation;
4248
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4349
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -47,16 +53,27 @@
4753
import org.apache.flink.table.api.java.StreamTableEnvironment;
4854
import org.apache.flink.table.catalog.ObjectIdentifier;
4955
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
56+
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
57+
import org.apache.flink.table.types.logical.DecimalType;
58+
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
5059
import org.apache.flink.table.types.logical.LogicalType;
5160
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5261
import org.apache.flink.types.Row;
5362
import org.slf4j.Logger;
5463
import org.slf4j.LoggerFactory;
5564

5665
import java.time.LocalDateTime;
57-
import java.util.*;
58-
59-
import static org.apache.calcite.sql.SqlKind.*;
66+
import java.util.Arrays;
67+
import java.util.LinkedList;
68+
import java.util.List;
69+
import java.util.Map;
70+
import java.util.Queue;
71+
import java.util.Set;
72+
73+
import static org.apache.calcite.sql.SqlKind.AS;
74+
import static org.apache.calcite.sql.SqlKind.INSERT;
75+
import static org.apache.calcite.sql.SqlKind.SELECT;
76+
import static org.apache.calcite.sql.SqlKind.WITH_ITEM;
6077

6178
/**
6279
* Reason:
@@ -279,10 +296,7 @@ private Table getTableFromCache(Map<String, Table> localTableCache, String table
279296
*/
280297
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, AbstractSideTableInfo sideTableInfo) {
281298
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo);
282-
if (CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))) {
283-
return true;
284-
}
285-
return false;
299+
return CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo));
286300
}
287301

288302
private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
@@ -367,8 +381,12 @@ private void joinFun(Object pollObj,
367381

368382
int length = leftTable.getSchema().getFieldDataTypes().length;
369383
LogicalType[] logicalTypes = new LogicalType[length];
370-
for(int i=0; i<length; i++){
384+
for (int i = 0; i < length; i++) {
371385
logicalTypes[i] = leftTable.getSchema().getFieldDataTypes()[i].getLogicalType();
386+
if (logicalTypes[i] instanceof LegacyTypeInformationType &&
387+
((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(BigDecimalTypeInfo.class)) {
388+
logicalTypes[i] = new DecimalType(38, 18);
389+
}
372390
}
373391

374392
BaseRowTypeInfo leftBaseTypeInfo = new BaseRowTypeInfo(logicalTypes, leftTable.getSchema().getFieldNames());
@@ -407,7 +425,14 @@ private void joinFun(Object pollObj,
407425
targetTable = localTableCache.get(joinInfo.getLeftTableName());
408426
}
409427

410-
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getFieldTypes(), targetTable.getSchema().getFieldNames());
428+
TypeInformation[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
429+
for (int i = 0; i < fieldDataTypes.length; i++) {
430+
if (fieldDataTypes[i].getClass().equals(BigDecimalTypeInfo.class)) {
431+
fieldDataTypes[i] = BasicTypeInfo.BIG_DEC_TYPE_INFO;
432+
}
433+
}
434+
435+
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());
411436

412437
DataStream adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
413438
.filter(f -> f.f0)
@@ -479,9 +504,7 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
479504
String fieldType = filed[filed.length - 1].trim();
480505
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
481506
Class tableField = table.getSchema().getFieldType(i).get().getTypeClass();
482-
if (fieldClass == tableField) {
483-
continue;
484-
} else {
507+
if (fieldClass != tableField) {
485508
return false;
486509
}
487510
}

0 commit comments

Comments
 (0)