Skip to content

Commit cb68efb

Browse files
author
xuchao
committed
Merge branch '1.8_release_3.10.x' into v1.10.0_dev
# Conflicts: # core/pom.xml # core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java # core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java # hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java # hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java
2 parents 076bb41 + d2c71e9 commit cb68efb

55 files changed

Lines changed: 2060 additions & 130 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

ci/sonar_notify.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22
#参考钉钉文档 https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq
33
sonarreport=$(curl -s http://172.16.100.198:8082/?projectname=dt-insight-engine/flinkStreamSQL)
4-
curl -s "https://oapi.dingtalk.com/robot/send?access_token=71555061297a53d3ac922a6f4d94285d8e23bccdca0c00b4dc6df0a2d49da724" \
4+
curl -s "https://oapi.dingtalk.com/robot/send?access_token=58fd731d8bed3b17708d3aa27e49a7e2c41c7e6545f6c4be3170963a7bba7e2a" \
55
-H "Content-Type: application/json" \
66
-d "{
77
\"msgtype\": \"markdown\",

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ public void open(Configuration parameters) throws Exception {
6767
vo.setFileResolverCachingEnabled(false);
6868
Vertx vertx = Vertx.vertx(vo);
6969
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
70-
setExecutor(new ThreadPoolExecutor(50, 50, 0, TimeUnit.MILLISECONDS,
71-
new LinkedBlockingQueue<>(10000), new DTThreadFactory("clickhouseAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()));
7270
}
7371

7472
}

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ public class Options {
7272
@OptionRequired(description = "log level")
7373
private String logLevel = "info";
7474

75+
@OptionRequired(description = "file add to ship file")
76+
private String addShipfile;
77+
78+
7579
public String getMode() {
7680
return mode;
7781
}
@@ -183,4 +187,13 @@ public String getLogLevel() {
183187
public void setLogLevel(String logLevel) {
184188
this.logLevel = logLevel;
185189
}
190+
191+
public String getAddShipfile() {
192+
return addShipfile;
193+
}
194+
195+
public void setAddShipfile(String addShipfile) {
196+
this.addShipfile = addShipfile;
197+
}
198+
186199
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class CreateTableParser implements IParser {
4141

4242
private static final Pattern PATTERN = Pattern.compile(PATTERN_STR);
4343

44+
private static final Pattern PROP_PATTERN = Pattern.compile("^'\\s*(.+)\\s*'$");
45+
4446
public static CreateTableParser newInstance(){
4547
return new CreateTableParser();
4648
}
@@ -69,18 +71,27 @@ public void parseSql(String sql, SqlTree sqlTree) {
6971
}
7072

7173
private Map parseProp(String propsStr){
72-
String[] strs = propsStr.trim().split("'\\s*,");
74+
propsStr = propsStr.replaceAll("'\\s*,", "'|");
75+
String[] strs = propsStr.trim().split("\\|");
7376
Map<String, Object> propMap = Maps.newHashMap();
7477
for(int i=0; i<strs.length; i++){
7578
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
7679
String key = ss.get(0).trim();
77-
String value = ss.get(1).trim().replaceAll("'", "").trim();
80+
String value = extractValue(ss.get(1).trim());
7881
propMap.put(key, value);
7982
}
8083

8184
return propMap;
8285
}
8386

87+
private String extractValue(String value) {
88+
Matcher matcher = PROP_PATTERN.matcher(value);
89+
if (matcher.find()) {
90+
return matcher.group(1);
91+
}
92+
throw new RuntimeException("[" + value + "] format is invalid");
93+
}
94+
8495
public static class SqlParserResult{
8596

8697
private String tableName;

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.parser;
2221

23-
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.SqlBasicCall;
2522
import org.apache.calcite.sql.SqlInsert;
2623
import org.apache.calcite.sql.SqlJoin;
2724
import org.apache.calcite.sql.SqlKind;
28-
import org.apache.calcite.sql.SqlMatchRecognize;
2925
import org.apache.calcite.sql.SqlNode;
30-
import org.apache.calcite.sql.SqlOrderBy;
3126
import org.apache.calcite.sql.SqlSelect;
32-
import org.apache.calcite.sql.parser.SqlParseException;
33-
import org.apache.calcite.sql.parser.SqlParser;
27+
import org.apache.calcite.sql.SqlNodeList;
28+
import org.apache.calcite.sql.SqlBasicCall;
29+
import org.apache.calcite.sql.SqlMatchRecognize;
30+
import org.apache.calcite.sql.SqlOrderBy;
31+
import org.apache.calcite.sql.SqlIdentifier;
32+
import org.apache.calcite.sql.SqlAsOperator;
33+
import org.apache.calcite.sql.parser.SqlParserPos;
3434
import org.apache.commons.lang3.StringUtils;
3535
import com.google.common.collect.Lists;
3636
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
@@ -51,6 +51,9 @@ public class InsertSqlParser implements IParser {
5151

5252
private FlinkPlanner flinkPlanner = new FlinkPlanner();
5353

54+
// 用来标识当前解析节点的上一层节点是否为 insert 节点
55+
private static Boolean parentIsInsert = false;
56+
5457
@Override
5558
public boolean verify(String sql) {
5659
return StringUtils.isNotBlank(sql) && sql.trim().toLowerCase().startsWith("insert");
@@ -80,13 +83,19 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
8083
SqlNode sqlTarget = ((SqlInsert)sqlNode).getTargetTable();
8184
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
8285
sqlParseResult.addTargetTable(sqlTarget.toString());
86+
parentIsInsert = true;
8387
parseNode(sqlSource, sqlParseResult);
8488
break;
8589
case SELECT:
86-
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
87-
if(sqlFrom.getKind() == IDENTIFIER){
90+
SqlSelect sqlSelect = (SqlSelect) sqlNode;
91+
if (parentIsInsert) {
92+
rebuildSelectNode(sqlSelect.getSelectList(), sqlSelect);
93+
}
94+
SqlNode sqlFrom = ((SqlSelect) sqlNode).getFrom();
95+
if (sqlFrom.getKind() == IDENTIFIER) {
8896
sqlParseResult.addSourceTable(sqlFrom.toString());
89-
}else{
97+
} else {
98+
parentIsInsert = false;
9099
parseNode(sqlFrom, sqlParseResult);
91100
}
92101
break;
@@ -143,6 +152,44 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
143152
}
144153
}
145154

155+
/**
156+
* 将第一层 select 中的 sqlNode 转化为 AsNode,解决字段名冲突问题
157+
* 仅对 table.xx 这种类型的字段进行替换
158+
* @param selectList select Node 的 select 字段
159+
* @param sqlSelect 第一层解析出来的 selectNode
160+
*/
161+
private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelect) {
162+
SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition());
163+
164+
for (int index = 0; index < selectList.size(); index++) {
165+
if (selectList.get(index).getKind().equals(SqlKind.AS)
166+
|| ((SqlIdentifier) selectList.get(index)).names.size() == 1) {
167+
sqlNodes.add(selectList.get(index));
168+
continue;
169+
}
170+
sqlNodes.add(transformToAsNode(selectList.get(index)));
171+
}
172+
sqlSelect.setSelectList(sqlNodes);
173+
}
174+
175+
/**
176+
* 将 sqlNode 转化为 AsNode
177+
* @param sqlNode 需要转化的 sqlNode
178+
* @return 重新构造的 AsNode
179+
*/
180+
public static SqlBasicCall transformToAsNode(SqlNode sqlNode) {
181+
String asName = "";
182+
SqlParserPos pos = new SqlParserPos(sqlNode.getParserPosition().getLineNum(),
183+
sqlNode.getParserPosition().getEndColumnNum());
184+
if (sqlNode.getKind().equals(SqlKind.IDENTIFIER)) {
185+
asName = ((SqlIdentifier) sqlNode).names.get(1);
186+
}
187+
SqlNode[] operands = new SqlNode[2];
188+
operands[0] = sqlNode;
189+
operands[1] = new SqlIdentifier(asName, null, pos);
190+
return new SqlBasicCall(new SqlAsOperator(), operands, pos);
191+
}
192+
146193
public static class SqlParseResult {
147194

148195
private List<String> sourceTableList = Lists.newArrayList();

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public static SqlTree parseSql(String sql) throws Exception {
6868
throw new RuntimeException("need to set local sql plugin root");
6969
}
7070

71-
sql = sql.replaceAll("--.*", "")
71+
sql = DtStringUtil.dealSqlComment(sql)
7272
.replaceAll("\r\n", " ")
7373
.replaceAll("\n", " ")
7474
.replace("\t", " ").trim();
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.apache.commons.io.FileUtils;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
28+
/**
29+
* Utility methods for helping with security tasks.
30+
* Date: 2019/12/28
31+
* Company: www.dtstack.com
32+
* @author maqi
33+
*/
34+
public class AuthUtil {
35+
36+
public static String creatJaasFile(String prefix, String suffix, JAASConfig jaasConfig) throws IOException {
37+
File krbConf = new File(System.getProperty("user.dir"));
38+
File temp = File.createTempFile(prefix, suffix, krbConf);
39+
temp.deleteOnExit();
40+
FileUtils.writeStringToFile(temp, jaasConfig.toString());
41+
return temp.getAbsolutePath();
42+
}
43+
44+
45+
public static class JAASConfig {
46+
private String entryName;
47+
private String loginModule;
48+
private String loginModuleFlag;
49+
private Map<String, String> loginModuleOptions;
50+
51+
public JAASConfig(String entryName, String loginModule, String loginModuleFlag, Map<String, String> loginModuleOptions) {
52+
this.entryName = entryName;
53+
this.loginModule = loginModule;
54+
this.loginModuleFlag = loginModuleFlag;
55+
this.loginModuleOptions = loginModuleOptions;
56+
}
57+
58+
public static Builder builder() {
59+
return new Builder();
60+
}
61+
62+
@Override
63+
public String toString() {
64+
StringBuilder stringBuilder = new StringBuilder(entryName).append(" {\n\t")
65+
.append(loginModule).append(" ").append(loginModuleFlag).append("\n\t");
66+
String[] keys = loginModuleOptions.keySet().toArray(new String[loginModuleOptions.size()]);
67+
for (int i = 0; i < keys.length; i++) {
68+
stringBuilder.append(keys[i]).append("=").append(loginModuleOptions.get(keys[i]));
69+
if (i != keys.length - 1) {
70+
stringBuilder.append("\n\t");
71+
} else {
72+
stringBuilder.append(";\n");
73+
}
74+
75+
}
76+
stringBuilder.append("\n").append("};");
77+
return stringBuilder.toString();
78+
}
79+
80+
public static class Builder {
81+
private String entryName;
82+
private String loginModule;
83+
private String loginModuleFlag;
84+
private Map<String, String> loginModuleOptions;
85+
86+
public Builder setEntryName(String entryName) {
87+
this.entryName = entryName;
88+
return this;
89+
}
90+
91+
public Builder setLoginModule(String loginModule) {
92+
this.loginModule = loginModule;
93+
return this;
94+
}
95+
96+
public Builder setLoginModuleFlag(String loginModuleFlag) {
97+
this.loginModuleFlag = loginModuleFlag;
98+
return this;
99+
}
100+
101+
public Builder setLoginModuleOptions(Map<String, String> loginModuleOptions) {
102+
this.loginModuleOptions = loginModuleOptions;
103+
return this;
104+
}
105+
106+
public JAASConfig build() {
107+
return new JAASConfig(
108+
entryName, loginModule, loginModuleFlag, loginModuleOptions);
109+
}
110+
}
111+
}
112+
}

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,57 @@ public static String replaceIgnoreQuota(String str, String oriStr, String replac
106106
return str.replaceAll(splitPatternStr, replaceStr);
107107
}
108108

109+
/**
110+
* 处理 sql 中 "--" 注释,而不删除引号内的内容
111+
*
112+
* @param sql 解析出来的 sql
113+
* @return 返回无注释内容的 sql
114+
*/
115+
public static String dealSqlComment(String sql) {
116+
boolean inQuotes = false;
117+
boolean inSingleQuotes = false;
118+
int bracketLeftNum = 0;
119+
StringBuilder b = new StringBuilder(sql.length());
120+
char[] chars = sql.toCharArray();
121+
for (int index = 0; index < chars.length; index ++) {
122+
if (index == chars.length) {
123+
return b.toString();
124+
}
125+
StringBuilder tempSb = new StringBuilder(2);
126+
if (index > 1) {
127+
tempSb.append(chars[index - 1]);
128+
tempSb.append(chars[index]);
129+
}
130+
131+
if (tempSb.toString().equals("--")) {
132+
if (inQuotes) {
133+
b.append(chars[index]);
134+
} else if (inSingleQuotes) {
135+
b.append(chars[index]);
136+
} else if (bracketLeftNum > 0) {
137+
b.append(chars[index]);
138+
} else {
139+
b.deleteCharAt(b.length() - 1);
140+
while (chars[index] != '\n') {
141+
// 判断注释内容是不是行尾或者 sql 的最后一行
142+
if (index == chars.length - 1) {
143+
break;
144+
}
145+
index++;
146+
}
147+
}
148+
} else if (chars[index] == '\"' && '\\' != chars[index] && !inSingleQuotes) {
149+
inQuotes = !inQuotes;
150+
b.append(chars[index]);
151+
} else if (chars[index] == '\'' && '\\' != chars[index] && !inQuotes) {
152+
inSingleQuotes = !inSingleQuotes;
153+
b.append(chars[index]);
154+
} else {
155+
b.append(chars[index]);
156+
}
157+
}
158+
return b.toString();
159+
}
109160

110161
public static String col2string(Object column, String type) {
111162
String rowData = column.toString();

0 commit comments

Comments
 (0)