Skip to content

Commit 89e282d

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 95873bd + af3dc96 commit 89e282d

5 files changed

Lines changed: 148 additions & 10 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21+
import com.dtstack.flink.sql.util.SqlCheckUtils;
2122
import org.apache.calcite.sql.SqlIdentifier;
2223
import org.apache.calcite.sql.SqlInsert;
2324
import org.apache.flink.sql.parser.dml.RichSqlInsert;
@@ -58,7 +59,7 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
5859
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
5960
StreamPlanner streamPlanner = (StreamPlanner)tableEnvImpl.getPlanner();
6061
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
61-
62+
SqlCheckUtils.check(stmt);
6263
RichSqlInsert insert = (RichSqlInsert) flinkPlanner.validate(flinkPlanner.parser().parse(stmt));
6364
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
6465

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,21 @@ private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelec
163163

164164
for (int index = 0; index < selectList.size(); index++) {
165165
if (selectList.get(index).getKind().equals(SqlKind.AS)
166-
|| ((SqlIdentifier) selectList.get(index)).names.size() == 1) {
166+
|| (selectList.get(index).getClass().equals(SqlIdentifier.class)
167+
&& ((SqlIdentifier) selectList.get(index)).names.size() == 1)) {
167168
sqlNodes.add(selectList.get(index));
168169
continue;
169170
}
171+
172+
if (!selectList.get(index).getClass().equals(SqlIdentifier.class)) {
173+
if (selectList.get(index).getKind().equals(SqlKind.LITERAL)) {
174+
throw new IllegalArgumentException(String.format("Constants %s in the SELECT statement must be aliased!",
175+
selectList.get(index).toString()));
176+
}
177+
throw new RuntimeException(String.format("Illegal statement! Please check the statement: %s",
178+
selectList.get(index).toString()));
179+
}
180+
170181
sqlNodes.add(transformToAsNode(selectList.get(index)));
171182
}
172183
sqlSelect.setSelectList(sqlNodes);

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.parser;
2222

@@ -27,6 +27,8 @@
2727
import org.apache.commons.lang3.StringUtils;
2828
import com.google.common.collect.Lists;
2929
import com.google.common.base.Strings;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032

3133
import java.util.List;
3234
import java.util.Set;
@@ -41,6 +43,7 @@
4143
*/
4244

4345
public class SqlParser {
46+
private static final Logger LOG = LoggerFactory.getLogger(SqlParser.class);
4447

4548
private static final char SQL_DELIMITER = ';';
4649

@@ -83,13 +86,18 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
8386
}
8487
boolean result = false;
8588
for(IParser sqlParser : sqlParserList){
86-
if(!sqlParser.verify(childSql)){
87-
continue;
88-
}
89+
try {
90+
if (!sqlParser.verify(childSql)) {
91+
continue;
92+
}
8993

90-
sqlParser.parseSql(childSql, sqlTree);
91-
result = true;
92-
break;
94+
sqlParser.parseSql(childSql, sqlTree);
95+
result = true;
96+
break;
97+
} catch (Exception e) {
98+
LOG.error("'{}' parser error, detail info: {}", childSql, e.getMessage(), e);
99+
throw e;
100+
}
93101
}
94102

95103
if(!result){

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
2929
import com.dtstack.flink.sql.util.ClassUtil;
3030
import com.dtstack.flink.sql.util.ParseUtils;
31+
import com.dtstack.flink.sql.util.SqlCheckUtils;
3132
import com.dtstack.flink.sql.util.TableUtils;
3233
import com.google.common.base.Preconditions;
3334
import com.google.common.collect.HashBasedTable;
@@ -151,7 +152,7 @@ public void exec(String sql,
151152

152153
} else if (pollSqlNode.getKind() == SELECT) {
153154
Preconditions.checkState(createView != null, "select sql must included by create view");
154-
Table table = tableEnv.sqlQuery(pollObj.toString());
155+
Table table = SqlCheckUtils.sqlQueryWithCheck(tableEnv, pollObj.toString());
155156

156157
if (createView.getFieldsInfoStr() == null) {
157158
tableEnv.registerTable(createView.getTableName(), table);
@@ -310,6 +311,64 @@ private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
310311
return res;
311312
}
312313

314+
/**
315+
* check whether all table fields exist in join condition.
316+
* @param conditionNode
317+
* @param joinScope
318+
*/
319+
public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinScope) {
320+
List<SqlNode> sqlNodeList = Lists.newArrayList();
321+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
322+
for (SqlNode sqlNode : sqlNodeList) {
323+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
324+
throw new RuntimeException("not compare operator.");
325+
}
326+
327+
SqlNode leftNode = ((SqlBasicCall) sqlNode).getOperands()[0];
328+
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
329+
330+
if (leftNode.getKind() == SqlKind.IDENTIFIER) {
331+
checkFieldInTable((SqlIdentifier) leftNode, joinScope, conditionNode);
332+
}
333+
334+
if (rightNode.getKind() == SqlKind.IDENTIFIER) {
335+
checkFieldInTable((SqlIdentifier) rightNode, joinScope, conditionNode);
336+
}
337+
338+
}
339+
}
340+
341+
/**
342+
* check whether table exists and whether field is in table.
343+
* @param sqlNode
344+
* @param joinScope
345+
* @param conditionNode
346+
*/
347+
private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNode conditionNode) {
348+
String tableName = sqlNode.getComponent(0).getSimple();
349+
String fieldName = sqlNode.getComponent(1).getSimple();
350+
JoinScope.ScopeChild scopeChild = joinScope.getScope(tableName);
351+
String tableErrorMsg = "table [%s] is not exist. error condition is [%s]. if you find [%s] is exist, please check AS statement";
352+
Preconditions.checkState(
353+
scopeChild != null,
354+
tableErrorMsg,
355+
tableName,
356+
conditionNode.toString(),
357+
tableName
358+
);
359+
360+
String[] fieldNames = scopeChild.getRowTypeInfo().getFieldNames();
361+
boolean hasField = Arrays.asList(fieldNames).contains(fieldName);
362+
String fieldErrorMsg = "table [%s] has not [%s] field.\n error join condition is [%s]";
363+
Preconditions.checkState(
364+
hasField,
365+
fieldErrorMsg,
366+
tableName,
367+
fieldName,
368+
conditionNode.toString()
369+
);
370+
}
371+
313372
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName, AbstractSideTableInfo sideTableInfo) {
314373
List<SqlNode> sqlNodeList = Lists.newArrayList();
315374
ParseUtils.parseAnd(conditionNode, sqlNodeList);
@@ -421,6 +480,7 @@ private void joinFun(Object pollObj,
421480
joinScope.addScope(rightScopeChild);
422481

423482
HashBasedTable<String, String, String> mappingTable = ((JoinInfo) pollObj).getTableFieldRef();
483+
checkConditionFieldsInTable(joinInfo.getCondition(), joinScope);
424484

425485
//获取两个表的所有字段
426486
List<FieldInfo> sideJoinFieldInfo = ParserJoinField.getRowTypeInfo(joinInfo.getSelectNode(), joinScope, true);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import com.google.common.base.Preconditions;
22+
import org.apache.flink.table.api.Table;
23+
import org.apache.flink.table.api.TableEnvironment;
24+
25+
import java.util.regex.Matcher;
26+
import java.util.regex.Pattern;
27+
28+
/**
29+
* @program: flinkStreamSQL
30+
* @author: wuren
31+
* @create: 2020/10/13
32+
**/
33+
public class SqlCheckUtils {
34+
35+
private final static Pattern NULL_AS_PATTERN = Pattern.compile("(?i)NULL\\s+AS");
36+
37+
/**
38+
* check SQL before call sqlQuery
39+
* @param tEnv
40+
* @param query
41+
* @return
42+
*/
43+
public static Table sqlQueryWithCheck(TableEnvironment tEnv, String query) {
44+
check(query);
45+
return tEnv.sqlQuery(query);
46+
}
47+
48+
/**
49+
* check SQL before pass into flink planner
50+
* 在传入原生Flink之前校验SQL合法性。
51+
* @param stmt
52+
*/
53+
public static void check(String stmt) {
54+
Matcher matcher = NULL_AS_PATTERN.matcher(stmt);
55+
Preconditions.checkState(!matcher.find(),"NULL AS is not supported. error SQL is [%s]", stmt);
56+
}
57+
58+
}

0 commit comments

Comments
 (0)