3030import io .vertx .ext .sql .SQLClient ;
3131import io .vertx .ext .sql .SQLConnection ;
3232import com .google .common .collect .Lists ;
33+ import org .apache .commons .lang .exception .ExceptionUtils ;
3334import org .apache .commons .lang3 .StringUtils ;
3435import org .apache .flink .streaming .api .functions .async .ResultFuture ;
3536import org .apache .flink .table .runtime .types .CRow ;
4546import java .util .concurrent .ScheduledFuture ;
4647import java .util .concurrent .atomic .AtomicBoolean ;
4748import java .util .concurrent .atomic .AtomicInteger ;
49+ import java .util .concurrent .atomic .AtomicReference ;
4850
4951/**
5052 * Date: 2018/11/26
@@ -90,6 +92,7 @@ protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
9092 @ Override
9193 public void handleAsyncInvoke (Map <String , Object > inputParams , CRow input , ResultFuture <CRow > resultFuture ) throws Exception {
9294 AtomicInteger failCounter = new AtomicInteger (0 );
95+ AtomicReference <Throwable > connErrMsg = new AtomicReference <>();
9396 while (true ){
9497 AtomicBoolean connectFinish = new AtomicBoolean (false );
9598 rdbSqlClient .getConnection (conn -> {
@@ -98,17 +101,20 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, Resul
98101 if (failCounter .getAndIncrement () % 1000 == 0 ){
99102 logger .error ("getConnection error" , conn .cause ());
100103 }
104+ connErrMsg .set (conn .cause ());
101105 conn .result ().close ();
102106 }
103107 ScheduledFuture <?> timerFuture = registerTimer (input , resultFuture );
104108 cancelTimerWhenComplete (resultFuture , timerFuture );
105109 handleQuery (conn .result (), inputParams , input , resultFuture );
106110 });
111+
107112 while (!connectFinish .get ()){
108113 Thread .sleep (50 );
109114 }
115+
110116 if (failCounter .get () >= sideInfo .getSideTableInfo ().getAsyncFailMaxNum (3 )){
111- resultFuture .completeExceptionally (new RuntimeException ( "connection fail" ));
117+ resultFuture .completeExceptionally (connErrMsg . get ( ));
112118 return ;
113119 }
114120 }
0 commit comments