@@ -273,8 +273,8 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
273273
274274 RowTypeInfo typeInfo = new RowTypeInfo (adaptTable .getSchema ().getTypes (), adaptTable .getSchema ().getColumnNames ());
275275 DataStream adaptStream = tableEnv .toRetractStream (adaptTable , typeInfo )
276- .map ((Tuple2 <Boolean , Row > f0 ) -> { return f0 .f1 ; })
277- .returns (typeInfo );
276+ .map ((Tuple2 <Boolean , Row > f0 ) -> { return f0 .f1 ; })
277+ .returns (typeInfo );
278278
279279 String fields = String .join ("," , typeInfo .getFieldNames ());
280280
@@ -288,18 +288,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
288288 Table regTable = tableEnv .fromDataStream (adaptStream , fields );
289289 tableEnv .registerTable (tableInfo .getName (), regTable );
290290 registerTableCache .put (tableInfo .getName (), regTable );
291- classPathSet .add (PluginUtil .getRemoteJarFilePath (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , remoteSqlPluginPath ));
291+ classPathSet .add (PluginUtil .getRemoteJarFilePath (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
292292 } else if (tableInfo instanceof TargetTableInfo ) {
293293
294294 TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
295295 TypeInformation [] flinkTypes = FlinkUtil .transformTypes (tableInfo .getFieldClasses ());
296296 tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
297- classPathSet .add ( PluginUtil .getRemoteJarFilePath (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , remoteSqlPluginPath ));
297+ classPathSet .add ( PluginUtil .getRemoteJarFilePath (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
298298 } else if (tableInfo instanceof SideTableInfo ){
299299
300300 String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
301301 sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
302- classPathSet .add (PluginUtil .getRemoteSideJarFilePath (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , remoteSqlPluginPath ));
302+ classPathSet .add (PluginUtil .getRemoteSideJarFilePath (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
303303 }else {
304304 throw new RuntimeException ("not support table type:" + tableInfo .getType ());
305305 }
0 commit comments