4646import org .apache .flink .api .common .restartstrategy .RestartStrategies ;
4747import org .apache .flink .api .common .time .Time ;
4848import org .apache .flink .api .common .typeinfo .TypeInformation ;
49+ import org .apache .flink .api .java .tuple .Tuple2 ;
4950import org .apache .flink .api .java .typeutils .RowTypeInfo ;
5051import org .apache .flink .calcite .shaded .com .google .common .base .Preconditions ;
5152import org .apache .flink .calcite .shaded .com .google .common .base .Strings ;
@@ -139,15 +140,8 @@ public static void main(String[] args) throws Exception {
139140 }
140141
141142 ClassLoader threadClassLoader = Thread .currentThread ().getContextClassLoader ();
142- DtClassLoader dtClassLoader = new DtClassLoader (new URL []{}, threadClassLoader );
143- Thread .currentThread ().setContextClassLoader (dtClassLoader );
144-
145- URLClassLoader parentClassloader ;
146- if (!ClusterMode .local .name ().equals (deployMode )){
147- parentClassloader = (URLClassLoader ) threadClassLoader .getParent ();
148- }else {
149- parentClassloader = dtClassLoader ;
150- }
143+ DtClassLoader parentClassloader = new DtClassLoader (new URL []{}, threadClassLoader );
144+ Thread .currentThread ().setContextClassLoader (parentClassloader );
151145
152146 confProp = URLDecoder .decode (confProp , Charsets .UTF_8 .toString ());
153147 Properties confProperties = PluginUtil .jsonStrToObject (confProp , Properties .class );
@@ -218,7 +212,7 @@ public static void main(String[] args) throws Exception {
218212
219213 if (env instanceof MyLocalStreamEnvironment ) {
220214 List <URL > urlList = new ArrayList <>();
221- urlList .addAll (Arrays .asList (dtClassLoader .getURLs ()));
215+ urlList .addAll (Arrays .asList (parentClassloader .getURLs ()));
222216 ((MyLocalStreamEnvironment ) env ).setClasspaths (urlList );
223217 }
224218
@@ -254,7 +248,6 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLo
254248 if (classLoader == null ) {
255249 classLoader = FlinkUtil .loadExtraJar (jarURList , parentClassloader );
256250 }
257- classLoader .loadClass (funcInfo .getClassName ());
258251 FlinkUtil .registerUDF (funcInfo .getType (), funcInfo .getClassName (), funcInfo .getName (),
259252 tableEnv , classLoader );
260253 }
@@ -279,7 +272,10 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
279272 Table adaptTable = adaptSql == null ? table : tableEnv .sqlQuery (adaptSql );
280273
281274 RowTypeInfo typeInfo = new RowTypeInfo (adaptTable .getSchema ().getTypes (), adaptTable .getSchema ().getColumnNames ());
282- DataStream adaptStream = tableEnv .toAppendStream (adaptTable , typeInfo );
275+ DataStream adaptStream = tableEnv .toRetractStream (adaptTable , typeInfo )
276+ .map ((Tuple2 <Boolean , Row > f0 ) -> { return f0 .f1 ; })
277+ .returns (typeInfo );
278+
283279 String fields = String .join ("," , typeInfo .getFieldNames ());
284280
285281 if (waterMarkerAssigner .checkNeedAssignWaterMarker (sourceTableInfo )){
@@ -292,18 +288,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
292288 Table regTable = tableEnv .fromDataStream (adaptStream , fields );
293289 tableEnv .registerTable (tableInfo .getName (), regTable );
294290 registerTableCache .put (tableInfo .getName (), regTable );
295- classPathSet .add (PluginUtil .getRemoteJarFilePath (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , remoteSqlPluginPath ));
291+ classPathSet .add (PluginUtil .getRemoteJarFilePath (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
296292 } else if (tableInfo instanceof TargetTableInfo ) {
297293
298294 TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
299295 TypeInformation [] flinkTypes = FlinkUtil .transformTypes (tableInfo .getFieldClasses ());
300296 tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
301- classPathSet .add ( PluginUtil .getRemoteJarFilePath (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , remoteSqlPluginPath ));
297+ classPathSet .add ( PluginUtil .getRemoteJarFilePath (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
302298 } else if (tableInfo instanceof SideTableInfo ){
303299
304300 String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
305301 sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
306- classPathSet .add (PluginUtil .getRemoteSideJarFilePath (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , remoteSqlPluginPath ));
302+ classPathSet .add (PluginUtil .getRemoteSideJarFilePath (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
307303 }else {
308304 throw new RuntimeException ("not support table type:" + tableInfo .getType ());
309305 }
0 commit comments