|
75 | 75 | import org.slf4j.LoggerFactory; |
76 | 76 |
|
77 | 77 | import java.io.File; |
78 | | -import java.io.IOException; |
79 | 78 | import java.lang.reflect.Field; |
80 | 79 | import java.lang.reflect.InvocationTargetException; |
81 | 80 | import java.lang.reflect.Method; |
@@ -251,14 +250,14 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla |
251 | 250 | private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader, |
252 | 251 | StreamTableEnvironment tableEnv) |
253 | 252 | throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { |
254 | | - //register urf |
255 | | - URLClassLoader classLoader = null; |
256 | 253 | List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList(); |
| 254 | + if (funcList.isEmpty()) { |
| 255 | + return; |
| 256 | + } |
| 257 | + //load jar |
| 258 | + URLClassLoader classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader); |
| 259 | + //register urf |
257 | 260 | for (CreateFuncParser.SqlParserResult funcInfo : funcList) { |
258 | | - //classloader |
259 | | - if (classLoader == null) { |
260 | | - classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader); |
261 | | - } |
262 | 261 | FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), |
263 | 262 | tableEnv, classLoader); |
264 | 263 | } |
@@ -329,25 +328,20 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en |
329 | 328 | } |
330 | 329 | } |
331 | 330 |
|
332 | | - private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException, NoSuchMethodException { |
| 331 | + private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception { |
333 | 332 | StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ? |
334 | 333 | StreamExecutionEnvironment.getExecutionEnvironment() : |
335 | 334 | new MyLocalStreamEnvironment(); |
336 | 335 | env.getConfig().disableClosureCleaner(); |
337 | 336 | env.setParallelism(FlinkUtil.getEnvParallelism(confProperties)); |
| 337 | + |
338 | 338 | Configuration globalJobParameters = new Configuration(); |
| 339 | + //Configuration unsupported set properties key-value |
339 | 340 | Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class); |
340 | 341 | method.setAccessible(true); |
341 | | - |
342 | | - confProperties.forEach((key,val) -> { |
343 | | - try { |
344 | | - method.invoke(globalJobParameters, key, val); |
345 | | - } catch (IllegalAccessException e) { |
346 | | - e.printStackTrace(); |
347 | | - } catch (InvocationTargetException e) { |
348 | | - e.printStackTrace(); |
349 | | - } |
350 | | - }); |
| 342 | + for (Map.Entry<Object, Object> prop : confProperties.entrySet()) { |
| 343 | + method.invoke(globalJobParameters, prop.getKey(), prop.getValue()); |
| 344 | + } |
351 | 345 |
|
352 | 346 | ExecutionConfig exeConfig = env.getConfig(); |
353 | 347 | if(exeConfig.getGlobalJobParameters() == null){ |
|
0 commit comments