Skip to content

Commit bd5953c

Browse files
author
dapeng
committed
Merge branch '1.8_release_3.10.x' into 1.8_3.10_zy
2 parents 63474ac + 5296857 commit bd5953c

6 files changed

Lines changed: 75 additions & 36 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,14 +198,18 @@ private static void sqlTranslation(String localSqlPluginPath,
198198

199199
SideSqlExec sideSqlExec = new SideSqlExec();
200200
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
201+
202+
int scope = 0;
201203
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
202-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
204+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result, scope + "");
205+
scope++;
203206
}
204207

205208
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
206209
if (LOG.isInfoEnabled()) {
207210
LOG.info("exe-sql:\n" + result.getExecSql());
208211
}
212+
209213
boolean isSide = false;
210214
for (String tableName : result.getTargetTableList()) {
211215
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
@@ -216,7 +220,7 @@ private static void sqlTranslation(String localSqlPluginPath,
216220
SqlNode sqlNode = flinkPlanner.parse(realSql);
217221
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
218222
tmp.setExecSql(tmpSql);
219-
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);
223+
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp, scope + "");
220224
} else {
221225
for (String sourceTable : result.getSourceTableList()) {
222226
if (sideTableMap.containsKey(sourceTable)) {
@@ -226,7 +230,7 @@ private static void sqlTranslation(String localSqlPluginPath,
226230
}
227231
if (isSide) {
228232
//sql-dimensional table contains the dimension table of execution
229-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
233+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, null);
230234
} else {
231235
LOG.info("----------exec sql without dimension join-----------");
232236
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
@@ -236,6 +240,8 @@ private static void sqlTranslation(String localSqlPluginPath,
236240
}
237241
}
238242
}
243+
244+
scope++;
239245
}
240246
}
241247
}

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import com.dtstack.flink.sql.util.TableUtils;
2324
import com.google.common.collect.HashBasedTable;
2425
import com.google.common.collect.Maps;
2526
import org.apache.calcite.sql.JoinType;
@@ -66,6 +67,8 @@ public class JoinInfo implements Serializable {
6667

6768
private JoinType joinType;
6869

70+
private String scope = "";
71+
6972
/**
7073
* 左表需要查询的字段信息和output的时候对应的列名称
7174
*/
@@ -96,12 +99,14 @@ public String getNewTableName(){
9699
//兼容左边表是as 的情况
97100
String leftStr = leftTableName;
98101
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
99-
return leftStr + "_" + rightTableName;
102+
String newName = leftStr + "_" + rightTableName;
103+
return TableUtils.buildTableNameWithScope(newName, scope);
100104
}
101105

102106

103107
public String getNewTableAlias(){
104-
return leftTableAlias + "_" + rightTableAlias;
108+
String newName = leftTableAlias + "_" + rightTableAlias;
109+
return TableUtils.buildTableNameWithScope(newName, scope);
105110
}
106111

107112
public boolean isLeftIsSideTable() {
@@ -233,6 +238,14 @@ public HashBasedTable<String, String, String> getTableFieldRef(){
233238
return mappingTable;
234239
}
235240

241+
public String getScope() {
242+
return scope;
243+
}
244+
245+
public void setScope(String scope) {
246+
this.scope = scope;
247+
}
248+
236249
@Override
237250
public String toString() {
238251
return "JoinInfo{" +

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
9090
SqlNodeList parentGroupByList,
9191
Set<Tuple2<String, String>> joinFieldSet,
9292
Map<String, String> tableRef,
93-
Map<String, String> fieldRef) {
93+
Map<String, String> fieldRef,
94+
String scope) {
9495

9596
SqlNode leftNode = joinNode.getLeft();
9697
SqlNode rightNode = joinNode.getRight();
@@ -106,13 +107,14 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
106107

107108
if (leftNode.getKind() == JOIN) {
108109
//处理连续join
109-
dealNestJoin(joinNode, sideTableSet,
110-
queueInfo, parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef);
110+
dealNestJoin(joinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
111+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
111112
leftNode = joinNode.getLeft();
112113
}
113114

114115
if (leftNode.getKind() == AS) {
115-
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
116+
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo,
117+
parentWhere, parentSelectList, parentGroupByList, scope);
116118
leftTbName = aliasInfo.getName();
117119
leftTbAlias = aliasInfo.getAlias();
118120
} else if(leftNode.getKind() == IDENTIFIER){
@@ -123,7 +125,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
123125
boolean leftIsSide = checkIsSideTable(leftTbName, sideTableSet);
124126
Preconditions.checkState(!leftIsSide, "side-table must be at the right of join operator");
125127

126-
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
128+
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
129+
parentWhere, parentSelectList, parentGroupByList, scope);
127130
rightTableName = rightTableNameAndAlias.f0;
128131
rightTableAlias = rightTableNameAndAlias.f1;
129132

@@ -146,6 +149,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
146149
tableInfo.setRightNode(rightNode);
147150
tableInfo.setJoinType(joinType);
148151
tableInfo.setCondition(joinNode.getCondition());
152+
tableInfo.setScope(scope);
153+
149154
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
150155

151156
//extract 需要查询的字段信息
@@ -256,16 +261,21 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
256261
SqlNodeList parentGroupByList,
257262
Set<Tuple2<String, String>> joinFieldSet,
258263
Map<String, String> tableRef,
259-
Map<String, String> fieldRef){
264+
Map<String, String> fieldRef,
265+
String scope){
260266

261267
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
262268
SqlNode parentRightJoinNode = joinNode.getRight();
263269
SqlNode rightNode = leftJoinNode.getRight();
264-
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
265-
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
270+
271+
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
272+
parentWhere, parentSelectList, parentGroupByList, scope);
273+
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet,
274+
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
266275
boolean parentRightIsSide = checkIsSideTable(parentRightJoinInfo.f0, sideTableSet);
267276

268-
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef);
277+
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
278+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
269279

270280
String rightTableName = rightTableNameAndAlias.f0;
271281
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
@@ -659,12 +669,13 @@ private void extractSelectField(SqlNode selectNode,
659669

660670

661671
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo,
662-
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList) {
672+
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList,
673+
String scope) {
663674
Tuple2<String, String> tabName = new Tuple2<>("", "");
664675
if(sqlNode.getKind() == IDENTIFIER){
665676
tabName.f0 = sqlNode.toString();
666677
}else{
667-
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList);
678+
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope);
668679
tabName.f0 = aliasInfo.getName();
669680
tabName.f1 = aliasInfo.getAlias();
670681
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,16 @@ public class SideSQLParser {
6262

6363
private Map<String, Table> localTableCache = Maps.newHashMap();
6464

65-
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws SqlParseException {
65+
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet, String scope) throws SqlParseException {
66+
6667
LOG.info("----------exec original Sql----------");
6768
LOG.info(exeSql);
6869

6970
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
7071
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();
7172
SqlNode sqlNode = flinkPlanner.parse(exeSql);
7273

73-
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null);
74+
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope);
7475
queueInfo.offer(sqlNode);
7576
return queueInfo;
7677
}
@@ -90,31 +91,32 @@ public Object parseSql(SqlNode sqlNode,
9091
Queue<Object> queueInfo,
9192
SqlNode parentWhere,
9293
SqlNodeList parentSelectList,
93-
SqlNodeList parentGroupByList){
94+
SqlNodeList parentGroupByList,
95+
String scope){
9496
SqlKind sqlKind = sqlNode.getKind();
9597
switch (sqlKind){
9698
case WITH: {
9799
SqlWith sqlWith = (SqlWith) sqlNode;
98100
SqlNodeList sqlNodeList = sqlWith.withList;
99101
for (SqlNode withAsTable : sqlNodeList) {
100102
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
101-
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
103+
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
102104
queueInfo.add(sqlWithItem);
103105
}
104-
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
106+
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
105107
break;
106108
}
107109
case INSERT:
108110
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
109-
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
111+
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
110112
case SELECT:
111113
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
112114
SqlNode sqlWhere = ((SqlSelect)sqlNode).getWhere();
113115
SqlNodeList selectList = ((SqlSelect)sqlNode).getSelectList();
114116
SqlNodeList groupByList = ((SqlSelect) sqlNode).getGroup();
115117

116118
if(sqlFrom.getKind() != IDENTIFIER){
117-
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList);
119+
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope);
118120
if(result instanceof JoinInfo){
119121
return TableUtils.dealSelectResultWithJoinInfo((JoinInfo) result, (SqlSelect) sqlNode, queueInfo);
120122
}else if(result instanceof AliasInfo){
@@ -136,7 +138,7 @@ public Object parseSql(SqlNode sqlNode,
136138
Map<String, String> tableRef = Maps.newHashMap();
137139
Map<String, String> fieldRef = Maps.newHashMap();
138140
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo,
139-
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef);
141+
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
140142
case AS:
141143
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
142144
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -145,7 +147,7 @@ public Object parseSql(SqlNode sqlNode,
145147
if(info.getKind() == IDENTIFIER){
146148
infoStr = info.toString();
147149
} else {
148-
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList).toString();
150+
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope).toString();
149151
}
150152

151153
AliasInfo aliasInfo = new AliasInfo();
@@ -158,12 +160,12 @@ public Object parseSql(SqlNode sqlNode,
158160
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
159161
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
160162

161-
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
162-
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
163+
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
164+
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
163165
break;
164166
case ORDER_BY:
165167
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
166-
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList);
168+
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
167169

168170
case LITERAL:
169171
return LITERAL.toString();

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public void exec(String sql,
9393
StreamTableEnvironment tableEnv,
9494
Map<String, Table> tableCache,
9595
StreamQueryConfig queryConfig,
96-
CreateTmpTableParser.SqlParserResult createView) throws Exception {
96+
CreateTmpTableParser.SqlParserResult createView,
97+
String scope) throws Exception {
9798
if(localSqlPluginPath == null){
9899
throw new RuntimeException("need to set localSqlPluginPath");
99100
}
@@ -113,7 +114,7 @@ public void exec(String sql,
113114

114115
SideSQLParser sideSQLParser = new SideSQLParser();
115116
sideSQLParser.setLocalTableCache(localTableCache);
116-
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
117+
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet(), scope);
117118
Object pollObj = null;
118119

119120
while((pollObj = exeQueue.poll()) != null){
@@ -437,9 +438,9 @@ private void joinFun(Object pollObj,
437438
replaceInfo.setTargetTableName(targetTableName);
438439
replaceInfo.setTargetTableAlias(targetTableAlias);
439440

440-
if (!tableEnv.isRegistered(joinInfo.getNewTableName())){
441+
if (!tableEnv.isRegistered(targetTableName)){
441442
Table joinTable = tableEnv.fromDataStream(dsOut);
442-
tableEnv.registerTable(joinInfo.getNewTableName(), joinTable);
443+
tableEnv.registerTable(targetTableName, joinTable);
443444
localTableCache.put(joinInfo.getNewTableName(), joinTable);
444445
}
445446
}

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,9 @@ public static SqlBasicCall buildAsNodeByJoinInfo(JoinInfo joinInfo, SqlNode sqlN
198198
SqlOperator operator = new SqlAsOperator();
199199

200200
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
201-
String joinLeftTableName = joinInfo.getLeftTableName();
202-
String joinLeftTableAlias = joinInfo.getLeftTableAlias();
203-
joinLeftTableName = Strings.isNullOrEmpty(joinLeftTableName) ? joinLeftTableAlias : joinLeftTableName;
204-
String newTableName = buildInternalTableName(joinLeftTableName, SPLIT, joinInfo.getRightTableName());
201+
String newTableName = joinInfo.getNewTableName();
205202
String lefTbAlias = joinInfo.getLeftTableAlias();
203+
206204
if(Strings.isNullOrEmpty(lefTbAlias)){
207205
Set<String> fromTableSet = Sets.newHashSet();
208206
TableUtils.getFromTableInfo(joinInfo.getLeftNode(), fromTableSet);
@@ -704,4 +702,12 @@ public static String buildTableField(String tableName, String fieldName){
704702
}
705703

706704

705+
public static String buildTableNameWithScope(String tableName, String scope){
706+
if(StringUtils.isEmpty(scope)){
707+
return tableName;
708+
}
709+
710+
return tableName + "_" + scope;
711+
}
712+
707713
}

0 commit comments

Comments
 (0)