Skip to content

Commit 372d55e

Browse files
committed
deal timeIndictorTypeinfo
1 parent 957b230 commit 372d55e

15 files changed

Lines changed: 48 additions & 73 deletions

File tree

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,7 @@ public Row fillData(Row input, Object sideInput) {
8686
Row row = new Row(sideInfo.getOutFieldInfoList().size());
8787
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
8888
Object obj = input.getField(entry.getValue());
89-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
90-
91-
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
92-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
93-
obj = ((Timestamp) obj).getTime();
94-
}
89+
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
9590
row.setField(entry.getKey(), obj);
9691
}
9792

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,7 @@ public Row fillData(Row input, Object line) {
268268
Row row = new Row(sideInfo.getOutFieldInfoList().size());
269269
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
270270
Object obj = input.getField(entry.getValue());
271-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
272-
273-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
274-
obj = ((Timestamp) obj).getTime();
275-
}
276-
271+
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
277272
row.setField(entry.getKey(), obj);
278273
}
279274

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
7373

7474
confProperties = PropertiesUtils.propertiesTrim(confProperties);
7575
streamEnv.getConfig().disableClosureCleaner();
76+
// Disables reusing object
77+
streamEnv.getConfig().enableObjectReuse();
7678

7779
Configuration globalJobParameters = new Configuration();
7880
//Configuration unsupported set properties key-value
@@ -198,8 +200,8 @@ public static Optional<TimeCharacteristic> getStreamTimeCharacteristic(Propertie
198200
}
199201

200202
public static Optional<Boolean> isCheckpointingEnabled(Properties properties) {
201-
boolean checkpointEnabled = properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY) == null
202-
&& properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY) == null;
203+
boolean checkpointEnabled = !(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY) == null
204+
&& properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY) == null);
203205
return Optional.of(checkpointEnabled);
204206
}
205207

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import com.dtstack.flink.sql.factory.DTThreadFactory;
2424
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2525
import org.apache.flink.configuration.Configuration;
26+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2627
import org.apache.flink.types.Row;
2728

2829
import java.sql.SQLException;
30+
import java.sql.Timestamp;
31+
import java.time.LocalDateTime;
2932
import java.util.concurrent.Executors;
3033
import java.util.concurrent.ScheduledExecutorService;
3134
import java.util.concurrent.TimeUnit;
@@ -64,4 +67,15 @@ public void open(Configuration parameters) throws Exception {
6467
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
6568
}
6669

70+
protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
71+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(index).getClass());
72+
73+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
74+
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
75+
obj = Timestamp.valueOf(((LocalDateTime) obj));
76+
}
77+
return obj;
78+
}
79+
80+
6781
}

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@
2929
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3030
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3131
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
32+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3233
import org.apache.flink.types.Row;
3334

35+
import java.sql.Timestamp;
36+
import java.time.LocalDateTime;
3437
import java.util.Collection;
3538
import java.util.Collections;
3639
import java.util.concurrent.TimeoutException;
@@ -82,6 +85,17 @@ private void initCache(){
8285
sideCache.initCache();
8386
}
8487

88+
89+
protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
90+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(index).getClass());
91+
92+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
93+
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
94+
obj = Timestamp.valueOf(((LocalDateTime) obj));
95+
}
96+
return obj;
97+
}
98+
8599
protected CacheObj getFromCache(String key){
86100
return sideInfo.getSideCache().getFromCache(key);
87101
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -238,25 +238,13 @@ public RowTypeInfo buildOutRowTypeInfo(List<FieldInfo> sideJoinFieldInfo, HashBa
238238

239239
mappingTable.put(tableName, fieldName, mappingFieldName);
240240

241-
sideOutTypes[i] = convertTimeAttributeType(fieldInfo.getTypeInformation());
241+
sideOutTypes[i] = fieldInfo.getTypeInformation();
242242
sideOutNames[i] = mappingFieldName;
243243
}
244244

245245
return new RowTypeInfo(sideOutTypes, sideOutNames);
246246
}
247247

248-
/**
249-
* 对protime和rowtime做类型转换
250-
* @param typeInformation
251-
* @return
252-
*/
253-
private TypeInformation convertTimeAttributeType(TypeInformation typeInformation) {
254-
if (typeInformation instanceof TimeIndicatorTypeInfo) {
255-
return SqlTimeTypeInfo.TIMESTAMP;
256-
}
257-
return typeInformation;
258-
}
259-
260248
//需要考虑更多的情况
261249
private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, String> mappingTable, String targetTableName, String tableAlias) {
262250
SqlKind sqlKind = sqlNode.getKind();

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.io.IOException;
4242
import java.sql.SQLException;
4343
import java.sql.Timestamp;
44+
import java.time.LocalDateTime;
4445
import java.util.*;
4546
import java.util.concurrent.atomic.AtomicReference;
4647

@@ -75,8 +76,8 @@ public Row fillData(Row input, Object sideInput) {
7576
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
7677

7778
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
78-
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
79-
obj = ((Timestamp)obj).getTime();
79+
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
80+
obj = Timestamp.valueOf(((LocalDateTime) obj));
8081
}
8182
row.setField(entry.getKey(), obj);
8283
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,7 @@ public Row fillData(Row input, Object sideInput){
167167
Row row = new Row(sideInfo.getOutFieldInfoList().size());
168168
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
169169
Object obj = input.getField(entry.getValue());
170-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
171-
172-
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
173-
obj = ((Timestamp)obj).getTime();
174-
}
175-
170+
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
176171
row.setField(entry.getKey(), obj);
177172
}
178173

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,7 @@ public Row fillData(Row input, Object sideInput) {
5858
Row row = new Row(sideInfo.getOutFieldInfoList().size());
5959
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
6060
Object obj = input.getField(entry.getValue());
61-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
62-
63-
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
64-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
65-
obj = ((Timestamp) obj).getTime();
66-
}
61+
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
6762
row.setField(entry.getKey(), obj);
6863
}
6964

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,7 @@ public Row fillData(Row input, Object sideInput) {
179179
Row row = new Row(sideInfo.getOutFieldInfoList().size());
180180
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
181181
Object obj = input.getField(entry.getValue());
182-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
183-
184-
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
185-
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
186-
obj = ((Timestamp) obj).getTime();
187-
}
182+
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
188183
row.setField(entry.getKey(), obj);
189184
}
190185

0 commit comments

Comments
 (0)