Skip to content

Commit 076bb41

Browse files
author
xuchao
committed
1:修改统一的Planner的获取方式
2:修改asyncOperator timer 的注册方式
1 parent 09b84a1 commit 076bb41

11 files changed

Lines changed: 104 additions & 60 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@
2929
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3030
import org.apache.flink.streaming.api.datastream.DataStream;
3131
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
32-
import org.apache.flink.table.api.EnvironmentSettings;
33-
import org.apache.flink.table.api.StreamQueryConfig;
34-
import org.apache.flink.table.api.Table;
35-
import org.apache.flink.table.api.TableEnvironment;
32+
import org.apache.flink.table.api.*;
33+
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
3634
import org.apache.flink.table.api.java.StreamTableEnvironment;
35+
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
3736
import org.apache.flink.table.sinks.TableSink;
3837
import org.apache.flink.types.Row;
3938

@@ -93,6 +92,7 @@ public class ExecuteProcessHelper {
9392
private static final Logger LOG = LoggerFactory.getLogger(ExecuteProcessHelper.class);
9493
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
9594

95+
public static FlinkPlanner flinkPlanner = new FlinkPlanner();
9696

9797
public static ParamsInfo parseParams(String[] args) throws Exception {
9898
LOG.info("------------program params-------------------------");
@@ -150,6 +150,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
150150
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
151151
StreamTableEnvironment tableEnv = getStreamTableEnv(env, paramsInfo.getConfProp());
152152

153+
153154
SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath());
154155
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql());
155156

@@ -211,7 +212,7 @@ private static void sqlTranslation(String localSqlPluginPath,
211212
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
212213
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
213214

214-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
215+
SqlNode sqlNode = flinkPlanner.getParser().parse(realSql);
215216
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
216217
tmp.setExecSql(tmpSql);
217218
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, tmp, scope + "");
@@ -357,7 +358,7 @@ private static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironme
357358
.inStreamingMode()
358359
.build();
359360

360-
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
361+
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, new TableConfig());
361362
StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
362363
return tableEnv;
363364
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,12 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24-
import org.apache.calcite.config.Lex;
2524
import org.apache.calcite.sql.SqlBasicCall;
2625
import org.apache.calcite.sql.SqlJoin;
2726
import org.apache.calcite.sql.SqlKind;
2827
import org.apache.calcite.sql.SqlNode;
2928
import org.apache.calcite.sql.SqlSelect;
30-
import org.apache.calcite.sql.parser.SqlParseException;
31-
import org.apache.calcite.sql.parser.SqlParser;
3229
import com.google.common.collect.Lists;
33-
import org.apache.flink.table.calcite.FlinkPlannerImpl;
34-
3530
import java.util.List;
3631
import java.util.regex.Matcher;
3732
import java.util.regex.Pattern;
@@ -55,6 +50,8 @@ public class CreateTmpTableParser implements IParser {
5550

5651
private static final Pattern EMPTYVIEW = Pattern.compile(EMPTY_STR);
5752

53+
private FlinkPlanner flinkPlanner = new FlinkPlanner();
54+
5855
public static CreateTmpTableParser newInstance(){
5956
return new CreateTmpTableParser();
6057
}
@@ -77,11 +74,10 @@ public void parseSql(String sql, SqlTree sqlTree) {
7774
tableName = matcher.group(1);
7875
selectSql = "select " + matcher.group(2);
7976
}
80-
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
8177

8278
SqlNode sqlNode = null;
8379
try {
84-
sqlNode = flinkPlanner.parse(selectSql);
80+
sqlNode = flinkPlanner.getParser().parse(selectSql);
8581
} catch (Exception e) {
8682
throw new RuntimeException("", e);
8783
}

core/src/main/java/com/dtstack/flink/sql/parser/FlinkPlanner.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,58 @@
1818

1919
package com.dtstack.flink.sql.parser;
2020

21-
import org.apache.calcite.plan.RelOptPlanner;
22-
import org.apache.calcite.tools.FrameworkConfig;
23-
import org.apache.flink.table.calcite.FlinkPlannerImpl;
24-
import org.apache.flink.table.calcite.FlinkTypeFactory;
21+
import org.apache.flink.table.api.EnvironmentSettings;
22+
import org.apache.flink.table.api.SqlDialect;
23+
import org.apache.flink.table.api.TableConfig;
24+
import org.apache.flink.table.catalog.Catalog;
25+
import org.apache.flink.table.catalog.CatalogManager;
26+
import org.apache.flink.table.catalog.FunctionCatalog;
27+
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
28+
import org.apache.flink.table.module.ModuleManager;
29+
import org.apache.flink.table.planner.calcite.CalciteParser;
30+
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
31+
import org.apache.flink.table.planner.delegation.PlannerContext;
32+
33+
import java.util.ArrayList;
34+
35+
import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
2536

2637
/**
38+
* 废弃。之后删除
2739
* Date: 2020/3/31
2840
* Company: www.dtstack.com
2941
* @author maqi
3042
*/
3143
public class FlinkPlanner {
3244

33-
public static volatile FlinkPlannerImpl flinkPlanner;
45+
private final TableConfig tableConfig = new TableConfig();
46+
47+
private final Catalog catalog = new GenericInMemoryCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
48+
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
49+
private final CatalogManager catalogManager =
50+
new CatalogManager("builtin", catalog);
51+
private final ModuleManager moduleManager = new ModuleManager();
52+
private final FunctionCatalog functionCatalog = new FunctionCatalog(
53+
tableConfig,
54+
catalogManager,
55+
moduleManager);
56+
private final PlannerContext plannerContext =
57+
new PlannerContext(tableConfig,
58+
functionCatalog,
59+
catalogManager,
60+
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
61+
new ArrayList<>());
62+
3463

35-
private FlinkPlanner() {
64+
public FlinkPlanner() {
3665
}
3766

38-
public static FlinkPlannerImpl createFlinkPlanner(FrameworkConfig frameworkConfig, RelOptPlanner relOptPlanner, FlinkTypeFactory typeFactory) {
39-
if (flinkPlanner == null) {
40-
synchronized (FlinkPlanner.class) {
41-
if (flinkPlanner == null) {
42-
flinkPlanner = new FlinkPlannerImpl(frameworkConfig, relOptPlanner, typeFactory);
43-
}
44-
}
45-
}
46-
return flinkPlanner;
67+
public CalciteParser getParser(){
68+
return getParserBySqlDialect(SqlDialect.DEFAULT);
4769
}
4870

49-
public static FlinkPlannerImpl getFlinkPlanner() {
50-
return flinkPlanner;
71+
public CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) {
72+
tableConfig.setSqlDialect(sqlDialect);
73+
return plannerContext.createCalciteParser();
5174
}
5275
}

core/src/main/java/com/dtstack/flink/sql/parser/IParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,5 @@ public interface IParser {
4141
* @param sql
4242
* @param sqlTree
4343
*/
44-
void parseSql(String sql, SqlTree sqlTree);
44+
void parseSql(String sql, SqlTree sqlTree) throws Exception;
4545
}

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.calcite.sql.parser.SqlParser;
3434
import org.apache.commons.lang3.StringUtils;
3535
import com.google.common.collect.Lists;
36-
import org.apache.flink.table.calcite.FlinkPlannerImpl;
36+
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
3737

3838
import java.util.List;
3939

@@ -49,6 +49,8 @@
4949

5050
public class InsertSqlParser implements IParser {
5151

52+
private FlinkPlanner flinkPlanner = new FlinkPlanner();
53+
5254
@Override
5355
public boolean verify(String sql) {
5456
return StringUtils.isNotBlank(sql) && sql.trim().toLowerCase().startsWith("insert");
@@ -60,10 +62,10 @@ public static InsertSqlParser newInstance(){
6062
}
6163

6264
@Override
63-
public void parseSql(String sql, SqlTree sqlTree) {
65+
public void parseSql(String sql, SqlTree sqlTree) throws Exception {
66+
6467

65-
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
66-
SqlNode sqlNode = flinkPlanner.parse(sql);
68+
SqlNode sqlNode = flinkPlanner.getParser().parse(sql);
6769

6870
SqlParseResult sqlParseResult = new SqlParseResult();
6971
parseNode(sqlNode, sqlParseResult);

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
2727
import com.dtstack.flink.sql.side.cache.CacheObj;
2828
import com.dtstack.flink.sql.side.cache.LRUSideCache;
29+
import com.dtstack.flink.sql.util.ReflectionUtils;
2930
import com.google.common.collect.Lists;
3031
import com.google.common.collect.Maps;
3132
import org.apache.calcite.sql.JoinType;
@@ -37,13 +38,16 @@
3738
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3839
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3940
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
41+
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
4042
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
4143
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
4244
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4345
import org.apache.flink.types.Row;
4446
import org.slf4j.Logger;
4547
import org.slf4j.LoggerFactory;
4648

49+
import java.lang.reflect.InvocationTargetException;
50+
import java.lang.reflect.Method;
4751
import java.sql.Timestamp;
4852
import java.time.LocalDateTime;
4953
import java.util.Collections;
@@ -169,9 +173,9 @@ public void timeout(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>>
169173
resultFuture.complete(null);
170174
}
171175

172-
protected void preInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture){
173-
ScheduledFuture<?> timeFuture = registerTimer(input, resultFuture);
174-
cancelTimerWhenComplete(resultFuture, timeFuture);
176+
protected void preInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture)
177+
throws InvocationTargetException, IllegalAccessException {
178+
registerTimerAndAddToHandler(input, resultFuture);
175179
}
176180

177181
@Override
@@ -261,16 +265,15 @@ public void onProcessingTime(long timestamp) throws Exception {
261265
});
262266
}
263267

264-
protected void cancelTimerWhenComplete(ResultFuture<Tuple2<Boolean,Row>> resultFuture, ScheduledFuture<?> timerFuture){
265-
ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
266-
if(resultFuture instanceof StreamRecordQueueEntry){
267-
StreamRecordQueueEntry streamRecordBufferEntry = (StreamRecordQueueEntry) resultFuture;
268-
streamRecordBufferEntry.onComplete((Object value) -> {
269-
timerFuture.cancel(true);
270-
},executors);
271-
}
268+
protected void registerTimerAndAddToHandler(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture)
269+
throws InvocationTargetException, IllegalAccessException {
270+
ScheduledFuture<?> timeFuture = registerTimer(input, resultFuture);
271+
// resultFuture 是ResultHandler 的实例
272+
Method setTimeoutTimer = ReflectionUtils.getDeclaredMethod(resultFuture, "setTimeoutTimer", ScheduledFuture.class);
273+
setTimeoutTimer.invoke(resultFuture, timeFuture);
272274
}
273275

276+
274277
protected void dealFillDataError(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture, Throwable e) {
275278
parseErrorRecords.inc();
276279
if(parseErrorRecords.getCount() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)){

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@
3939
import org.apache.calcite.sql.SqlOperator;
4040
import org.apache.calcite.sql.fun.SqlCase;
4141
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
42+
import org.apache.calcite.sql.parser.SqlParser;
4243
import org.apache.calcite.sql.parser.SqlParserPos;
4344
import org.apache.commons.collections.CollectionUtils;
4445
import org.apache.commons.lang3.StringUtils;
4546
import org.apache.flink.api.java.tuple.Tuple2;
46-
import org.apache.flink.table.calcite.FlinkPlannerImpl;
4747

4848
import java.util.List;
4949
import java.util.Map;
@@ -67,6 +67,8 @@ public class JoinNodeDealer {
6767

6868
private SideSQLParser sideSQLParser;
6969

70+
private FlinkPlanner flinkPlanner = new FlinkPlanner();
71+
7072
public JoinNodeDealer(SideSQLParser sideSQLParser){
7173
this.sideSQLParser = sideSQLParser;
7274
}
@@ -426,8 +428,7 @@ private void extractTemporaryQuery(SqlNode node, String tableAlias,
426428
node.toString(),
427429
extractConditionStr);
428430

429-
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
430-
SqlNode sqlNode = flinkPlanner.parse(tmpSelectSql);
431+
SqlNode sqlNode = flinkPlanner.getParser().parse(tmpSelectSql);
431432

432433
SqlBasicCall sqlBasicCall = buildAsSqlNode(tableAlias, sqlNode);
433434
queueInfo.offer(sqlBasicCall);

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.calcite.sql.SqlSelect;
3232
import org.apache.calcite.sql.parser.SqlParseException;
3333
import org.apache.commons.lang3.StringUtils;
34-
import org.apache.flink.table.calcite.FlinkPlannerImpl;
3534

3635
import java.util.List;
3736
import java.util.Map;
@@ -46,9 +45,12 @@
4645
* @author maqi
4746
*/
4847
public class SidePredicatesParser {
48+
49+
private FlinkPlanner flinkPlanner = new FlinkPlanner();
50+
4951
public void fillPredicatesForSideTable(String exeSql, Map<String, AbstractSideTableInfo> sideTableMap) throws SqlParseException {
50-
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
51-
SqlNode sqlNode = flinkPlanner.parse(exeSql);
52+
53+
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(exeSql).parseStmt();
5254
parseSql(sqlNode, sideTableMap, Maps.newHashMap());
5355
}
5456

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@
3737
import org.apache.calcite.sql.SqlWith;
3838
import org.apache.calcite.sql.SqlWithItem;
3939
import org.apache.calcite.sql.parser.SqlParseException;
40+
import org.apache.calcite.sql.parser.SqlParser;
4041
import org.apache.flink.api.java.tuple.Tuple2;
4142
import org.apache.flink.table.api.Table;
42-
import org.apache.flink.table.calcite.FlinkPlannerImpl;
43+
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
4546

@@ -62,14 +63,15 @@ public class SideSQLParser {
6263

6364
private Map<String, Table> localTableCache = Maps.newHashMap();
6465

66+
private FlinkPlanner flinkPlanner = new FlinkPlanner();
67+
6568
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet, String scope) throws SqlParseException {
6669

6770
LOG.info("----------exec original Sql----------");
6871
LOG.info(exeSql);
6972

7073
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
71-
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
72-
SqlNode sqlNode = flinkPlanner.parse(exeSql);
74+
SqlNode sqlNode = flinkPlanner.getParser().parse(exeSql);
7375

7476
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope);
7577
queueInfo.offer(sqlNode);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2727
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2828
import org.apache.flink.streaming.api.datastream.DataStream;
29+
import org.apache.flink.table.api.EnvironmentSettings;
2930
import org.apache.flink.table.api.Table;
3031
import org.apache.flink.table.api.TableSchema;
3132
import org.apache.flink.table.api.java.StreamTableEnvironment;
33+
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
34+
import org.apache.flink.table.catalog.CatalogManager;
35+
import org.apache.flink.table.catalog.ObjectIdentifier;
3236
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3337
import org.apache.flink.types.Row;
3438

@@ -433,9 +437,15 @@ private void joinFun(Object pollObj,
433437
replaceInfo.setTargetTableName(targetTableName);
434438
replaceInfo.setTargetTableAlias(targetTableAlias);
435439

436-
if (!tableEnv.isRegistered(targetTableName)){
440+
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
441+
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
442+
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE,
443+
targetTableName);
444+
boolean tableExists = tableEnv.getCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG).get().tableExists(objectIdentifier.toObjectPath());
445+
446+
if (!tableExists){
437447
Table joinTable = tableEnv.fromDataStream(dsOut);
438-
tableEnv.registerTable(targetTableName, joinTable);
448+
tableEnv.createTemporaryView(targetTableName, joinTable);
439449
localTableCache.put(joinInfo.getNewTableName(), joinTable);
440450
}
441451
}

0 commit comments

Comments
 (0)