|
70 | 70 | import org.slf4j.Logger; |
71 | 71 | import org.slf4j.LoggerFactory; |
72 | 72 | import java.io.File; |
73 | | -import java.io.IOException; |
74 | 73 | import java.lang.reflect.Field; |
75 | 74 | import java.lang.reflect.InvocationTargetException; |
76 | 75 | import java.lang.reflect.Method; |
@@ -225,14 +224,14 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla |
225 | 224 | private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader, |
226 | 225 | StreamTableEnvironment tableEnv) |
227 | 226 | throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { |
228 | | - //register urf |
229 | | - URLClassLoader classLoader = null; |
230 | 227 | List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList(); |
| 228 | + if (funcList.isEmpty()) { |
| 229 | + return; |
| 230 | + } |
| 231 | + //load jar |
| 232 | + URLClassLoader classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader); |
| 233 | + //register urf |
231 | 234 | for (CreateFuncParser.SqlParserResult funcInfo : funcList) { |
232 | | - //classloader |
233 | | - if (classLoader == null) { |
234 | | - classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader); |
235 | | - } |
236 | 235 | FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), |
237 | 236 | tableEnv, classLoader); |
238 | 237 | } |
@@ -303,22 +302,21 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en |
303 | 302 | } |
304 | 303 | } |
305 | 304 |
|
306 | | - private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException, NoSuchMethodException { |
| 305 | + private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception { |
307 | 306 | StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ? |
308 | 307 | StreamExecutionEnvironment.getExecutionEnvironment() : |
309 | 308 | new MyLocalStreamEnvironment(); |
310 | 309 | env.getConfig().disableClosureCleaner(); |
311 | 310 | env.setParallelism(FlinkUtil.getEnvParallelism(confProperties)); |
| 311 | + |
312 | 312 | Configuration globalJobParameters = new Configuration(); |
| 313 | + //Configuration unsupported set properties key-value |
313 | 314 | Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class); |
314 | 315 | method.setAccessible(true); |
315 | | - confProperties.forEach((key,val) -> { |
316 | | - try { |
317 | | - method.invoke(globalJobParameters, key, val); |
318 | | - } catch (Exception e) { |
319 | | - LOG.error("set Configuration key:{},value:{} error:{}",key,val,e); |
320 | | - } |
321 | | - }); |
| 316 | + for (Map.Entry<Object, Object> prop : confProperties.entrySet()) { |
| 317 | + method.invoke(globalJobParameters, prop.getKey(), prop.getValue()); |
| 318 | + } |
| 319 | + |
322 | 320 | ExecutionConfig exeConfig = env.getConfig(); |
323 | 321 | if(exeConfig.getGlobalJobParameters() == null){ |
324 | 322 | exeConfig.setGlobalJobParameters(globalJobParameters); |
|
0 commit comments