Skip to content

Commit 897cb0b

Browse files
committed
Merge branch 'hotfix_1.10_4.0.x_31674' into '1.10_release_4.0.x'
[fix] decimal type transform error when table join See merge request dt-insight-engine/flinkStreamSQL!173
2 parents 9bdcfa3 + 9b3dc31 commit 897cb0b

1 file changed

Lines changed: 30 additions & 12 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.calcite.sql.parser.SqlParseException;
4444
import org.apache.commons.collections.CollectionUtils;
4545
import org.apache.commons.lang3.StringUtils;
46+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
4647
import org.apache.flink.api.common.typeinfo.TypeInformation;
4748
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4849
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -52,16 +53,27 @@
5253
import org.apache.flink.table.api.java.StreamTableEnvironment;
5354
import org.apache.flink.table.catalog.ObjectIdentifier;
5455
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
56+
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
57+
import org.apache.flink.table.types.logical.DecimalType;
58+
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
5559
import org.apache.flink.table.types.logical.LogicalType;
5660
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5761
import org.apache.flink.types.Row;
5862
import org.slf4j.Logger;
5963
import org.slf4j.LoggerFactory;
6064

6165
import java.time.LocalDateTime;
62-
import java.util.*;
63-
64-
import static org.apache.calcite.sql.SqlKind.*;
66+
import java.util.Arrays;
67+
import java.util.LinkedList;
68+
import java.util.List;
69+
import java.util.Map;
70+
import java.util.Queue;
71+
import java.util.Set;
72+
73+
import static org.apache.calcite.sql.SqlKind.AS;
74+
import static org.apache.calcite.sql.SqlKind.INSERT;
75+
import static org.apache.calcite.sql.SqlKind.SELECT;
76+
import static org.apache.calcite.sql.SqlKind.WITH_ITEM;
6577

6678
/**
6779
* Reason:
@@ -284,10 +296,7 @@ private Table getTableFromCache(Map<String, Table> localTableCache, String table
284296
*/
285297
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, AbstractSideTableInfo sideTableInfo) {
286298
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo);
287-
if (CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))) {
288-
return true;
289-
}
290-
return false;
299+
return CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo));
291300
}
292301

293302
private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
@@ -372,8 +381,12 @@ private void joinFun(Object pollObj,
372381

373382
int length = leftTable.getSchema().getFieldDataTypes().length;
374383
LogicalType[] logicalTypes = new LogicalType[length];
375-
for(int i=0; i<length; i++){
384+
for (int i = 0; i < length; i++) {
376385
logicalTypes[i] = leftTable.getSchema().getFieldDataTypes()[i].getLogicalType();
386+
if (logicalTypes[i] instanceof LegacyTypeInformationType &&
387+
((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(BigDecimalTypeInfo.class)) {
388+
logicalTypes[i] = new DecimalType(38, 18);
389+
}
377390
}
378391

379392
BaseRowTypeInfo leftBaseTypeInfo = new BaseRowTypeInfo(logicalTypes, leftTable.getSchema().getFieldNames());
@@ -412,7 +425,14 @@ private void joinFun(Object pollObj,
412425
targetTable = localTableCache.get(joinInfo.getLeftTableName());
413426
}
414427

415-
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getFieldTypes(), targetTable.getSchema().getFieldNames());
428+
TypeInformation[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
429+
for (int i = 0; i < fieldDataTypes.length; i++) {
430+
if (fieldDataTypes[i].getClass().equals(BigDecimalTypeInfo.class)) {
431+
fieldDataTypes[i] = BasicTypeInfo.BIG_DEC_TYPE_INFO;
432+
}
433+
}
434+
435+
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());
416436

417437
DataStream adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
418438
.filter(f -> f.f0)
@@ -484,9 +504,7 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
484504
String fieldType = filed[filed.length - 1].trim();
485505
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
486506
Class tableField = table.getSchema().getFieldType(i).get().getTypeClass();
487-
if (fieldClass == tableField) {
488-
continue;
489-
} else {
507+
if (fieldClass != tableField) {
490508
return false;
491509
}
492510
}

0 commit comments

Comments
 (0)