|
23 | 23 | import com.dtstack.flink.sql.classloader.DtClassLoader; |
24 | 24 | import com.dtstack.flink.sql.enums.ECacheType; |
25 | 25 | import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment; |
26 | | -import com.dtstack.flink.sql.parser.*; |
| 26 | +import com.dtstack.flink.sql.exec.FlinkSQLExec; |
| 27 | +import com.dtstack.flink.sql.parser.CreateFuncParser; |
| 28 | +import com.dtstack.flink.sql.parser.CreateTmpTableParser; |
| 29 | +import com.dtstack.flink.sql.parser.InsertSqlParser; |
| 30 | +import com.dtstack.flink.sql.parser.SqlParser; |
| 31 | +import com.dtstack.flink.sql.parser.SqlTree; |
27 | 32 | import com.dtstack.flink.sql.side.SideSqlExec; |
28 | 33 | import com.dtstack.flink.sql.side.SideTableInfo; |
29 | 34 | import com.dtstack.flink.sql.table.SourceTableInfo; |
|
49 | 54 | import org.apache.flink.api.common.typeinfo.TypeInformation; |
50 | 55 | import org.apache.flink.api.java.tuple.Tuple2; |
51 | 56 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
52 | | -import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions; |
53 | | -import org.apache.flink.calcite.shaded.com.google.common.base.Strings; |
54 | | -import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; |
55 | | -import org.apache.flink.calcite.shaded.com.google.common.collect.Maps; |
56 | | -import org.apache.flink.calcite.shaded.com.google.common.collect.Sets; |
57 | 57 | import org.apache.flink.client.program.ContextEnvironment; |
58 | 58 | import org.apache.flink.configuration.Configuration; |
| 59 | +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; |
| 60 | +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; |
| 61 | +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; |
| 62 | +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; |
| 63 | +import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; |
59 | 64 | import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; |
60 | 65 | import org.apache.flink.streaming.api.datastream.DataStream; |
61 | 66 | import org.apache.flink.streaming.api.environment.StreamContextEnvironment; |
| 67 | + |
62 | 68 | import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
63 | 69 | import org.apache.flink.table.api.Table; |
64 | 70 | import org.apache.flink.table.api.java.StreamTableEnvironment; |
| 71 | + |
65 | 72 | import org.apache.flink.table.sinks.TableSink; |
66 | 73 | import org.apache.flink.types.Row; |
67 | 74 | import org.slf4j.Logger; |
@@ -205,7 +212,10 @@ public static void main(String[] args) throws Exception { |
205 | 212 | //sql-dimensional table contains the dimension table of execution |
206 | 213 | sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache); |
207 | 214 | }else{ |
208 | | - tableEnv.sqlUpdate(result.getExecSql()); |
| 215 | + FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql()); |
| 216 | + if(LOG.isInfoEnabled()){ |
| 217 | + LOG.info("exec sql: " + result.getExecSql()); |
| 218 | + } |
209 | 219 | } |
210 | 220 | } |
211 | 221 | } |
@@ -288,6 +298,9 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en |
288 | 298 |
|
289 | 299 | Table regTable = tableEnv.fromDataStream(adaptStream, fields); |
290 | 300 | tableEnv.registerTable(tableInfo.getName(), regTable); |
| 301 | + if(LOG.isInfoEnabled()){ |
| 302 | + LOG.info("registe table {} success.", tableInfo.getName()); |
| 303 | + } |
291 | 304 | registerTableCache.put(tableInfo.getName(), regTable); |
292 | 305 | classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath)); |
293 | 306 | } else if (tableInfo instanceof TargetTableInfo) { |
@@ -320,7 +333,6 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert |
320 | 333 | StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ? |
321 | 334 | StreamExecutionEnvironment.getExecutionEnvironment() : |
322 | 335 | new MyLocalStreamEnvironment(); |
323 | | - |
324 | 336 | env.getConfig().disableClosureCleaner(); |
325 | 337 | env.setParallelism(FlinkUtil.getEnvParallelism(confProperties)); |
326 | 338 | Configuration globalJobParameters = new Configuration(); |
|
0 commit comments