5353import org .apache .calcite .sql .SqlInsert ;
5454import org .apache .calcite .sql .SqlNode ;
5555import org .apache .commons .io .Charsets ;
56+ import org .apache .commons .lang3 .SerializationUtils ;
5657import org .apache .commons .lang3 .StringUtils ;
5758import org .apache .flink .api .common .typeinfo .TypeInformation ;
5859import org .apache .flink .api .java .typeutils .RowTypeInfo ;
7576import java .net .URLClassLoader ;
7677import java .net .URLDecoder ;
7778import java .time .ZoneId ;
78- import java .util .ArrayList ;
79- import java .util .Arrays ;
80- import java .util .List ;
81- import java .util .Map ;
82- import java .util .Properties ;
83- import java .util .Set ;
84- import java .util .TimeZone ;
79+ import java .util .*;
8580import java .util .stream .Stream ;
8681
8782/**
@@ -215,7 +210,11 @@ private static void sqlTranslation(String localSqlPluginPath,
215210 scope ++;
216211 }
217212
213+ final Map <String , AbstractSideTableInfo > tmpTableMap = new HashMap <>();
218214 for (InsertSqlParser .SqlParseResult result : sqlTree .getExecSqlList ()) {
215+ // prevent current sql use last sql's sideTableInfo
216+ sideTableMap .forEach ((s , abstractSideTableInfo ) -> tmpTableMap .put (s , SerializationUtils .clone (abstractSideTableInfo )));
217+
219218 if (LOG .isInfoEnabled ()) {
220219 LOG .info ("exe-sql:\n " + result .getExecSql ());
221220 }
@@ -228,17 +227,17 @@ private static void sqlTranslation(String localSqlPluginPath,
228227 SqlNode sqlNode = flinkPlanner .getParser ().parse (realSql );
229228 String tmpSql = ((SqlInsert ) sqlNode ).getSource ().toString ();
230229 tmp .setExecSql (tmpSql );
231- sideSqlExec .exec (tmp .getExecSql (), sideTableMap , tableEnv , registerTableCache , tmp , scope + "" );
230+ sideSqlExec .exec (tmp .getExecSql (), tmpTableMap , tableEnv , registerTableCache , tmp , scope + "" );
232231 } else {
233232 for (String sourceTable : result .getSourceTableList ()) {
234- if (sideTableMap .containsKey (sourceTable )) {
233+ if (tmpTableMap .containsKey (sourceTable )) {
235234 isSide = true ;
236235 break ;
237236 }
238237 }
239238 if (isSide ) {
240239 //sql-dimensional table contains the dimension table of execution
241- sideSqlExec .exec (result .getExecSql (), sideTableMap , tableEnv , registerTableCache , null , String .valueOf (scope ));
240+ sideSqlExec .exec (result .getExecSql (), tmpTableMap , tableEnv , registerTableCache , null , String .valueOf (scope ));
242241 } else {
243242 LOG .info ("----------exec sql without dimension join-----------" );
244243 LOG .info ("----------real sql exec is--------------------------\n {}" , result .getExecSql ());
@@ -251,26 +250,17 @@ private static void sqlTranslation(String localSqlPluginPath,
251250
252251 scope ++;
253252 }
253+ tmpTableMap .clear ();
254254 }
255255 }
256256
257257 public static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv , boolean getPlan )
258258 throws IllegalAccessException , InvocationTargetException {
259259 // udf和tableEnv须由同一个类加载器加载
260- ClassLoader levelClassLoader = tableEnv .getClass ().getClassLoader ();
261260 ClassLoader currentClassLoader = Thread .currentThread ().getContextClassLoader ();
262- URLClassLoader classLoader = null ;
261+ URLClassLoader classLoader = ClassLoaderManager . loadExtraJar ( jarUrlList , ( URLClassLoader ) currentClassLoader ) ;
263262 List <CreateFuncParser .SqlParserResult > funcList = sqlTree .getFunctionList ();
264263 for (CreateFuncParser .SqlParserResult funcInfo : funcList ) {
265- // 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
266- if (getPlan ) {
267- classLoader = ClassLoaderManager .loadExtraJar (jarUrlList , (URLClassLoader ) currentClassLoader );
268- }
269-
270- //classloader
271- if (classLoader == null ) {
272- classLoader = ClassLoaderManager .loadExtraJar (jarUrlList , (URLClassLoader ) levelClassLoader );
273- }
274264 FunctionManager .registerUDF (funcInfo .getType (), funcInfo .getClassName (), funcInfo .getName (), tableEnv , classLoader );
275265 }
276266 }
0 commit comments