|
27 | 27 | import org.apache.flink.table.api.internal.TableImpl; |
28 | 28 | import org.apache.flink.table.api.java.StreamTableEnvironment; |
29 | 29 | import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl; |
| 30 | +import org.apache.flink.table.catalog.CatalogManager; |
| 31 | +import org.apache.flink.table.catalog.ObjectIdentifier; |
| 32 | +import org.apache.flink.table.operations.Operation; |
30 | 33 | import org.apache.flink.table.operations.QueryOperation; |
31 | 34 | import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; |
32 | 35 | import org.apache.flink.table.planner.delegation.PlannerBase; |
33 | 36 | import org.apache.flink.table.planner.delegation.StreamPlanner; |
34 | | -import org.apache.flink.table.planner.operations.PlannerQueryOperation; |
35 | 37 | import org.apache.flink.table.planner.operations.SqlToOperationConverter; |
36 | 38 | import org.apache.flink.table.sinks.TableSink; |
37 | 39 | import org.slf4j.Logger; |
38 | 40 | import org.slf4j.LoggerFactory; |
39 | 41 | import scala.Option; |
| 42 | +import scala.Tuple2; |
40 | 43 |
|
| 44 | +import java.lang.reflect.Constructor; |
41 | 45 | import java.lang.reflect.InvocationTargetException; |
42 | 46 | import java.lang.reflect.Method; |
43 | | -import java.util.Arrays; |
44 | | -import java.util.List; |
45 | 47 |
|
46 | 48 |
|
47 | 49 | /** |
|
51 | 53 | */ |
52 | 54 | public class FlinkSQLExec { |
53 | 55 | private static final Logger LOG = LoggerFactory.getLogger(FlinkSQLExec.class); |
| 56 | + |
54 | 57 | public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception { |
55 | 58 | StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv); |
56 | 59 | StreamPlanner streamPlanner = (StreamPlanner)tableEnvImpl.getPlanner(); |
57 | 60 | FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner(); |
58 | 61 |
|
59 | | - RichSqlInsert insert = (RichSqlInsert)flinkPlanner.parse(stmt); |
| 62 | + RichSqlInsert insert = (RichSqlInsert) flinkPlanner.validate(flinkPlanner.parser().parse(stmt)); |
60 | 63 | TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert); |
61 | 64 |
|
62 | 65 | String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0); |
@@ -93,19 +96,28 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw |
93 | 96 |
|
94 | 97 | } |
95 | 98 |
|
| 99 | + private static SqlToOperationConverter createSqlToOperationConverter(FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager) |
| 100 | + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { |
| 101 | + |
| 102 | + Constructor<SqlToOperationConverter> declaredConstructor = SqlToOperationConverter.class.getDeclaredConstructor(FlinkPlannerImpl.class, CatalogManager.class); |
| 103 | + declaredConstructor.setAccessible(true); |
| 104 | + SqlToOperationConverter sqlToOperationConverter = declaredConstructor.newInstance(flinkPlanner, catalogManager); |
| 105 | + return sqlToOperationConverter; |
| 106 | + } |
| 107 | + |
96 | 108 | private static TableSink getTableSinkByPlanner(StreamPlanner streamPlanner, String targetTableName) |
97 | 109 | throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { |
98 | | - Method getTableSink = PlannerBase.class.getDeclaredMethod("getTableSink", List.class); |
| 110 | + Method getTableSink = PlannerBase.class.getDeclaredMethod("getTableSink", ObjectIdentifier.class); |
99 | 111 | getTableSink.setAccessible(true); |
100 | | - Option tableSinkOption = (Option) getTableSink.invoke(streamPlanner, Arrays.asList(targetTableName)); |
101 | | - return (TableSink) tableSinkOption.get(); |
| 112 | + ObjectIdentifier objectIdentifier = ObjectIdentifier.of(streamPlanner.catalogManager().getCurrentCatalog(), streamPlanner.catalogManager().getCurrentDatabase(), targetTableName); |
| 113 | + Option tableSinkOption = (Option) getTableSink.invoke(streamPlanner, objectIdentifier); |
| 114 | + return (TableSink) ((Tuple2) tableSinkOption.get())._2; |
102 | 115 | } |
103 | 116 |
|
104 | 117 | private static TableImpl extractQueryTableFromInsertCaluse(StreamTableEnvironmentImpl tableEnvImpl, FlinkPlannerImpl flinkPlanner, RichSqlInsert insert) |
105 | 118 | throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { |
106 | | - |
107 | | - PlannerQueryOperation queryOperation = (PlannerQueryOperation) SqlToOperationConverter.convert(flinkPlanner, |
108 | | - insert.getSource()); |
| 119 | + StreamPlanner streamPlanner = (StreamPlanner) tableEnvImpl.getPlanner(); |
| 120 | + Operation queryOperation = SqlToOperationConverter.convert(flinkPlanner, streamPlanner.catalogManager(), insert.getSource()).get(); |
109 | 121 | Method createTableMethod = TableEnvironmentImpl.class.getDeclaredMethod("createTable", QueryOperation.class); |
110 | 122 | createTableMethod.setAccessible(true); |
111 | 123 | return (TableImpl) createTableMethod.invoke(tableEnvImpl, queryOperation); |
|
0 commit comments