|
42 | 42 | import org.apache.commons.cli.DefaultParser; |
43 | 43 | import org.apache.commons.cli.Options; |
44 | 44 | import org.apache.commons.io.Charsets; |
| 45 | +import org.apache.flink.api.common.ExecutionConfig; |
45 | 46 | import org.apache.flink.api.common.restartstrategy.RestartStrategies; |
46 | 47 | import org.apache.flink.api.common.time.Time; |
47 | 48 | import org.apache.flink.api.common.typeinfo.TypeInformation; |
|
52 | 53 | import org.apache.flink.calcite.shaded.com.google.common.collect.Maps; |
53 | 54 | import org.apache.flink.calcite.shaded.com.google.common.collect.Sets; |
54 | 55 | import org.apache.flink.client.program.ContextEnvironment; |
| 56 | +import org.apache.flink.configuration.Configuration; |
55 | 57 | import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; |
56 | 58 | import org.apache.flink.streaming.api.datastream.DataStream; |
57 | 59 | import org.apache.flink.streaming.api.environment.StreamContextEnvironment; |
|
67 | 69 | import java.io.IOException; |
68 | 70 | import java.lang.reflect.Field; |
69 | 71 | import java.lang.reflect.InvocationTargetException; |
| 72 | +import java.lang.reflect.Method; |
70 | 73 | import java.net.URL; |
71 | 74 | import java.net.URLClassLoader; |
72 | 75 | import java.net.URLDecoder; |
@@ -316,12 +319,33 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en |
316 | 319 | } |
317 | 320 | } |
318 | 321 |
|
319 | | - private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException { |
| 322 | + private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException, NoSuchMethodException { |
320 | 323 | StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ? |
321 | 324 | StreamExecutionEnvironment.getExecutionEnvironment() : |
322 | 325 | new MyLocalStreamEnvironment(); |
323 | 326 |
|
324 | 327 | env.setParallelism(FlinkUtil.getEnvParallelism(confProperties)); |
| 328 | + Configuration globalJobParameters = new Configuration(); |
| 329 | + Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class); |
| 330 | + method.setAccessible(true); |
| 331 | + |
| 332 | + confProperties.forEach((key,val) -> { |
| 333 | + try { |
| 334 | + method.invoke(globalJobParameters, key, val); |
| 335 | + } catch (IllegalAccessException e) { |
| 336 | + e.printStackTrace(); |
| 337 | + } catch (InvocationTargetException e) { |
| 338 | + e.printStackTrace(); |
| 339 | + } |
| 340 | + }); |
| 341 | + |
| 342 | + ExecutionConfig exeConfig = env.getConfig(); |
| 343 | + if(exeConfig.getGlobalJobParameters() == null){ |
| 344 | + exeConfig.setGlobalJobParameters(globalJobParameters); |
| 345 | + }else if(exeConfig.getGlobalJobParameters() instanceof Configuration){ |
| 346 | + ((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters); |
| 347 | + } |
| 348 | + |
325 | 349 |
|
326 | 350 | if(FlinkUtil.getMaxEnvParallelism(confProperties) > 0){ |
327 | 351 | env.setMaxParallelism(FlinkUtil.getMaxEnvParallelism(confProperties)); |
|
0 commit comments