2525import com .dtstack .flink .sql .parser .SqlParser ;
2626import com .dtstack .flink .sql .parser .SqlTree ;
2727import org .apache .flink .api .common .typeinfo .TypeInformation ;
28- import org .apache .flink .api .java .tuple .Tuple2 ;
2928import org .apache .flink .api .java .typeutils .RowTypeInfo ;
3029import org .apache .flink .streaming .api .datastream .DataStream ;
3130import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
3231import org .apache .flink .table .api .*;
33- import org .apache .flink .table .api .internal .TableEnvironmentImpl ;
3432import org .apache .flink .table .api .java .StreamTableEnvironment ;
3533import org .apache .flink .table .api .java .internal .StreamTableEnvironmentImpl ;
3634import org .apache .flink .table .sinks .TableSink ;
37- import org .apache .flink .types .Row ;
3835
3936import com .dtstack .flink .sql .classloader .ClassLoaderManager ;
40- import com .dtstack .flink .sql .constrant .ConfigConstrant ;
4137import com .dtstack .flink .sql .enums .ClusterMode ;
4238import com .dtstack .flink .sql .enums .ECacheType ;
4339import com .dtstack .flink .sql .enums .EPluginLoadMode ;
7470import java .net .URL ;
7571import java .net .URLClassLoader ;
7672import java .net .URLDecoder ;
77- import java .util .Arrays ;
78- import java .util .List ;
79- import java .util .Map ;
80- import java .util .Properties ;
81- import java .util .Set ;
73+ import java .time .ZoneId ;
74+ import java .util .*;
8275
8376/**
8477 * 任务执行时的流程方法
@@ -92,6 +85,8 @@ public class ExecuteProcessHelper {
9285 private static final Logger LOG = LoggerFactory .getLogger (ExecuteProcessHelper .class );
9386 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
9487
88+ private static final String TIME_ZONE = "timezone" ;
89+
9590 public static FlinkPlanner flinkPlanner = new FlinkPlanner ();
9691
9792 public static ParamsInfo parseParams (String [] args ) throws Exception {
@@ -108,7 +103,6 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
108103 String remoteSqlPluginPath = options .getRemoteSqlPluginPath ();
109104 String pluginLoadMode = options .getPluginLoadMode ();
110105 String deployMode = options .getMode ();
111- String logLevel = options .getLogLevel ();
112106
113107 Preconditions .checkArgument (checkRemoteSqlPluginPath (remoteSqlPluginPath , deployMode , pluginLoadMode ),
114108 "Non-local mode or shipfile deployment mode, remoteSqlPluginPath is required" );
@@ -152,7 +146,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
152146
153147
154148 SqlParser .setLocalSqlPluginRoot (paramsInfo .getLocalSqlPluginPath ());
155- SqlTree sqlTree = SqlParser .parseSql (paramsInfo .getSql ());
149+ SqlTree sqlTree = SqlParser .parseSql (paramsInfo .getSql (), paramsInfo . getPluginLoadMode () );
156150
157151 Map <String , AbstractSideTableInfo > sideTableMap = Maps .newHashMap ();
158152 Map <String , Table > registerTableCache = Maps .newHashMap ();
@@ -165,7 +159,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
165159 // cache classPathSets
166160 ExecuteProcessHelper .registerPluginUrlToCachedFile (env , classPathSets );
167161
168- ExecuteProcessHelper .sqlTranslation (paramsInfo .getLocalSqlPluginPath (), tableEnv , sqlTree , sideTableMap , registerTableCache );
162+ ExecuteProcessHelper .sqlTranslation (paramsInfo .getLocalSqlPluginPath (), paramsInfo . getPluginLoadMode (), tableEnv , sqlTree , sideTableMap , registerTableCache );
169163
170164 if (env instanceof MyLocalStreamEnvironment ) {
171165 ((MyLocalStreamEnvironment ) env ).setClasspaths (ClassLoaderManager .getClassPath ());
@@ -189,12 +183,14 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
189183 }
190184
191185 private static void sqlTranslation (String localSqlPluginPath ,
186+ String pluginLoadMode ,
192187 StreamTableEnvironment tableEnv ,
193188 SqlTree sqlTree ,Map <String , AbstractSideTableInfo > sideTableMap ,
194189 Map <String , Table > registerTableCache ) throws Exception {
195190
196191 SideSqlExec sideSqlExec = new SideSqlExec ();
197192 sideSqlExec .setLocalSqlPluginPath (localSqlPluginPath );
193+ sideSqlExec .setPluginLoadMode (pluginLoadMode );
198194
199195 int scope = 0 ;
200196 for (CreateTmpTableParser .SqlParserResult result : sqlTree .getTmpSqlList ()) {
@@ -278,7 +274,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
278274 if (tableInfo instanceof AbstractSourceTableInfo ) {
279275
280276 AbstractSourceTableInfo sourceTableInfo = (AbstractSourceTableInfo ) tableInfo ;
281- Table table = StreamSourceFactory .getStreamSource (sourceTableInfo , env , tableEnv , localSqlPluginPath );
277+ Table table = StreamSourceFactory .getStreamSource (sourceTableInfo , env , tableEnv , localSqlPluginPath , pluginLoadMode );
282278 tableEnv .registerTable (sourceTableInfo .getAdaptName (), table );
283279 //Note --- parameter conversion function can not be used inside a function of the type of polymerization
284280 //Create table in which the function is arranged only need adaptation sql
@@ -308,7 +304,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
308304 pluginClassPathSets .add (sourceTablePathUrl );
309305 } else if (tableInfo instanceof AbstractTargetTableInfo ) {
310306
311- TableSink tableSink = StreamSinkFactory .getTableSink ((AbstractTargetTableInfo ) tableInfo , localSqlPluginPath );
307+ TableSink tableSink = StreamSinkFactory .getTableSink ((AbstractTargetTableInfo ) tableInfo , localSqlPluginPath , pluginLoadMode );
312308 TypeInformation [] flinkTypes = FunctionManager .transformTypes (tableInfo .getFieldClasses ());
313309 tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
314310
@@ -324,6 +320,9 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
324320 throw new RuntimeException ("not support table type:" + tableInfo .getType ());
325321 }
326322 }
323+ if (localSqlPluginPath == null || localSqlPluginPath .isEmpty ()) {
324+ return Sets .newHashSet ();
325+ }
327326 return pluginClassPathSets ;
328327 }
329328
@@ -351,16 +350,28 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
351350 }
352351
353352
354- private static StreamTableEnvironment getStreamTableEnv (StreamExecutionEnvironment env , Properties confProperties ) {
353+ public static StreamTableEnvironment getStreamTableEnv (StreamExecutionEnvironment env , Properties confProperties ) {
355354 // use blink and streammode
356355 EnvironmentSettings settings = EnvironmentSettings .newInstance ()
357356 .useBlinkPlanner ()
358357 .inStreamingMode ()
359358 .build ();
360359
361- StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl .create (env , settings , new TableConfig ());
360+ TableConfig tableConfig = new TableConfig ();
361+
362+ timeZoneCheck (confProperties .getProperty (TIME_ZONE ));
363+
364+ tableConfig .setLocalTimeZone (ZoneId .of (confProperties .getProperty (TIME_ZONE )));
365+
366+ StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl .create (env , settings , tableConfig );
362367 StreamEnvConfigManager .streamTableEnvironmentStateTTLConfig (tableEnv , confProperties );
363368 return tableEnv ;
364369 }
365370
371+ private static void timeZoneCheck (String timeZone ) {
372+ ArrayList <String > zones = Lists .newArrayList (TimeZone .getAvailableIDs ());
373+ if (!zones .contains (timeZone )){
374+ throw new IllegalArgumentException (" timezone is Incorrect!" );
375+ }
376+ }
366377}
0 commit comments