|
25 | 25 | import org.apache.flink.api.common.typeinfo.TypeInformation; |
26 | 26 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
27 | 27 | import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo; |
| 28 | +import org.apache.flink.table.types.DataType; |
| 29 | +import org.apache.flink.table.types.logical.DecimalType; |
28 | 30 | import org.apache.flink.table.types.logical.LogicalType; |
| 31 | +import org.apache.flink.table.types.utils.ClassDataTypeConverter; |
29 | 32 | import org.apache.flink.table.types.utils.TypeConversions; |
30 | 33 |
|
31 | 34 | import java.io.Serializable; |
| 35 | +import java.math.BigDecimal; |
32 | 36 | import java.util.List; |
33 | 37 | import java.util.Objects; |
| 38 | +import java.util.Optional; |
34 | 39 |
|
35 | 40 | /** |
36 | 41 | * Reason: |
@@ -104,7 +109,17 @@ public BaseRowTypeInfo getBaseRowTypeInfo(){ |
104 | 109 | Class[] fieldClass = getFieldClasses(); |
105 | 110 | LogicalType[] logicalTypes = new LogicalType[fieldClass.length]; |
106 | 111 | for (int i = 0; i < fieldClass.length; i++) { |
107 | | - logicalTypes[i] = TypeConversions.fromLegacyInfoToDataType(TypeInformation.of(fieldClass[i])).getLogicalType(); |
| 112 | + if(fieldClass[i].getName().equals(BigDecimal.class.getName())){ |
| 113 | + logicalTypes[i] = new DecimalType(DecimalType.MAX_PRECISION, 18); |
| 114 | + continue; |
| 115 | + } |
| 116 | + |
| 117 | + Optional<DataType> optionalDataType = ClassDataTypeConverter.extractDataType(fieldClass[i]); |
| 118 | + if(!optionalDataType.isPresent()){ |
| 119 | + throw new RuntimeException(String.format("not support table % field %s type %s", getName(), fieldNames[i], fieldClass[i])); |
| 120 | + } |
| 121 | + |
| 122 | + logicalTypes[i] = optionalDataType.get().getLogicalType(); |
108 | 123 | } |
109 | 124 |
|
110 | 125 | return new BaseRowTypeInfo(logicalTypes, fieldNames); |
|
0 commit comments