File tree Expand file tree Collapse file tree
core/src/main/java/com/dtstack/flink/sql/side Expand file tree Collapse file tree Original file line number Diff line number Diff line change 7676import java .util .Optional ;
7777import java .util .Queue ;
7878import java .util .Set ;
79+ import java .util .concurrent .atomic .AtomicBoolean ;
7980
8081import static org .apache .calcite .sql .SqlKind .AS ;
8182import static org .apache .calcite .sql .SqlKind .INSERT ;
@@ -562,7 +563,13 @@ private void joinFun(Object pollObj,
562563 EnvironmentSettings .DEFAULT_BUILTIN_CATALOG ,
563564 EnvironmentSettings .DEFAULT_BUILTIN_DATABASE ,
564565 targetTableName );
565- boolean tableExists = tableEnv .getCatalog (EnvironmentSettings .DEFAULT_BUILTIN_CATALOG ).get ().tableExists (objectIdentifier .toObjectPath ());
566+ boolean tableExists = false ;
567+ for (String table : tableEnv .listTables ()) {
568+ if (table .equals (targetTableName )) {
569+ tableExists = true ;
570+ break ;
571+ }
572+ }
566573
567574 if (!tableExists ) {
568575 Table joinTable = tableEnv .fromDataStream (dsOut );
You can’t perform that action at this time.
0 commit comments