Skip to content

Commit bfce8c8

Browse files
author
dapeng
committed
修复退出逻辑
1 parent 23b410c commit bfce8c8

1 file changed

Lines changed: 9 additions & 7 deletions

File tree

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,30 +93,32 @@ protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
9393
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
9494
AtomicInteger failCounter = new AtomicInteger(0);
9595
AtomicReference<Throwable> connErrMsg = new AtomicReference<>();
96-
while(true){
96+
AtomicBoolean finishFlag = new AtomicBoolean(false);
97+
while(!finishFlag.get()){
9798
AtomicBoolean connectFinish = new AtomicBoolean(false);
9899
rdbSqlClient.getConnection(conn -> {
99-
connectFinish.set(true);
100100
if(conn.failed()){
101+
connectFinish.set(true);
101102
if(failCounter.getAndIncrement() % 1000 == 0){
102103
logger.error("getConnection error", conn.cause());
103104
}
105+
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3)){
106+
resultFuture.completeExceptionally(connErrMsg.get());
107+
finishFlag.set(true);
108+
}
104109
connErrMsg.set(conn.cause());
105110
conn.result().close();
111+
return;
106112
}
107113
ScheduledFuture<?> timerFuture = registerTimer(input, resultFuture);
108114
cancelTimerWhenComplete(resultFuture, timerFuture);
109115
handleQuery(conn.result(), inputParams, input, resultFuture);
116+
finishFlag.set(true);
110117
});
111118

112119
while(!connectFinish.get()){
113120
Thread.sleep(50);
114121
}
115-
116-
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3)){
117-
resultFuture.completeExceptionally(connErrMsg.get());
118-
return;
119-
}
120122
}
121123
}
122124

0 commit comments

Comments
 (0)