Skip to content

Commit 0ba5727

Browse files
author
xuchao
committed
Merge remote-tracking branch 'origin/v1.10.0_dev' into v1.10.0_dev
2 parents 7c3f1ab + 6bc8c4d commit 0ba5727

2 files changed

Lines changed: 5 additions & 12 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ target/
88
*.eclipse.*
99
*.iml
1010
plugins/
11+
sqlplugins/
1112
lib/
1213
.vertx/
1314
.DS_Store

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@
3838
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3939
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
4040
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
41-
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
4241
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
4342
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
44-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
43+
import org.apache.flink.table.api.DataTypes;
4544
import org.apache.flink.types.Row;
4645
import org.slf4j.Logger;
4746
import org.slf4j.LoggerFactory;
@@ -53,10 +52,7 @@
5352
import java.util.Collections;
5453
import java.util.List;
5554
import java.util.Map;
56-
import java.util.concurrent.LinkedBlockingQueue;
5755
import java.util.concurrent.ScheduledFuture;
58-
import java.util.concurrent.ThreadPoolExecutor;
59-
import java.util.concurrent.TimeUnit;
6056

6157
/**
6258
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -114,7 +110,7 @@ private void initMetric() {
114110

115111

116112
protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
117-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(index).getClass());
113+
boolean isTimeIndicatorTypeInfo = DataTypes.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(index).getClass());
118114

119115
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
120116
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
@@ -257,19 +253,15 @@ protected ScheduledFuture<?> registerTimer(Tuple2<Boolean,Row> input, ResultFutu
257253
long timeoutTimestamp = sideInfo.getSideTableInfo().getAsyncTimeout() + getProcessingTimeService().getCurrentProcessingTime();
258254
return getProcessingTimeService().registerTimer(
259255
timeoutTimestamp,
260-
new ProcessingTimeCallback() {
261-
@Override
262-
public void onProcessingTime(long timestamp) throws Exception {
263-
timeout(input, resultFuture);
264-
}
265-
});
256+
timestamp -> timeout(input, resultFuture));
266257
}
267258

268259
protected void registerTimerAndAddToHandler(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture)
269260
throws InvocationTargetException, IllegalAccessException {
270261
ScheduledFuture<?> timeFuture = registerTimer(input, resultFuture);
271262
// resultFuture 是ResultHandler 的实例
272263
Method setTimeoutTimer = ReflectionUtils.getDeclaredMethod(resultFuture, "setTimeoutTimer", ScheduledFuture.class);
264+
setTimeoutTimer.setAccessible(true);
273265
setTimeoutTimer.invoke(resultFuture, timeFuture);
274266
}
275267

0 commit comments

Comments
 (0)