File tree Expand file tree Collapse file tree
core/src/main/java/com/dtstack/flink/sql Expand file tree Collapse file tree Original file line number Diff line number Diff line change 4040import org .apache .flink .streaming .runtime .tasks .ProcessingTimeService ;
4141import org .apache .flink .table .api .DataTypes ;
4242import org .apache .flink .table .dataformat .BaseRow ;
43+ import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
4344import org .apache .flink .types .Row ;
4445import org .slf4j .Logger ;
4546import org .slf4j .LoggerFactory ;
@@ -112,7 +113,7 @@ private void initMetric() {
112113
113114
114115 protected Object convertTimeIndictorTypeInfo (Integer index , Object obj ) {
115- boolean isTimeIndicatorTypeInfo = DataTypes .class .isAssignableFrom (sideInfo .getRowTypeInfo ().getTypeAt (index ).getClass ());
116+ boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo .class .isAssignableFrom (sideInfo .getRowTypeInfo ().getTypeAt (index ).getClass ());
116117
117118 //Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
118119 if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo ) {
Original file line number Diff line number Diff line change 33import org .apache .flink .table .dataformat .BaseRow ;
44import org .apache .flink .table .dataformat .BinaryString ;
55import org .apache .flink .table .dataformat .GenericRow ;
6+ import org .apache .flink .table .dataformat .SqlTimestamp ;
67import org .apache .flink .types .Row ;
78
89import java .sql .Timestamp ;
@@ -23,7 +24,8 @@ public static BaseRow convertToBaseRow(Row row){
2324 if (row .getField (i ) instanceof String ){
2425 genericRow .setField (i , BinaryString .fromString ((String )row .getField (i )));
2526 } else if (row .getField (i ) instanceof Timestamp ){
26- genericRow .setField (i , ((Timestamp )row .getField (i )).getTime ());
27+ SqlTimestamp newTimestamp = SqlTimestamp .fromTimestamp (((Timestamp )row .getField (i )));
28+ genericRow .setField (i , newTimestamp );
2729 }else {
2830 genericRow .setField (i , row .getField (i ));
2931 }
You can’t perform that action at this time.
0 commit comments