Skip to content

Commit 26bbe35

Browse files
author
yanxi0227
committed
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v1.5.0_dev
2 parents a4b18db + 7406d68 commit 26bbe35

10 files changed

Lines changed: 64 additions & 23 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql;
2222

@@ -34,6 +34,7 @@
3434
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
3535
import com.dtstack.flink.sql.util.FlinkUtil;
3636
import com.dtstack.flink.sql.util.PluginUtil;
37+
import org.apache.calcite.config.Lex;
3738
import org.apache.calcite.sql.SqlInsert;
3839
import org.apache.calcite.sql.SqlNode;
3940
import org.apache.commons.cli.CommandLine;
@@ -185,7 +186,12 @@ public static void main(String[] args) throws Exception {
185186
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
186187
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
187188
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
188-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql).parseStmt();
189+
190+
org.apache.calcite.sql.parser.SqlParser.Config config = org.apache.calcite.sql.parser.SqlParser
191+
.configBuilder()
192+
.setLex(Lex.MYSQL)
193+
.build();
194+
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql,config).parseStmt();
189195
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
190196
tmp.setExecSql(tmpSql);
191197
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
@@ -246,7 +252,7 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLo
246252
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
247253
}
248254
classLoader.loadClass(funcInfo.getClassName());
249-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName().toUpperCase(),
255+
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
250256
tableEnv, classLoader);
251257
}
252258
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public boolean verify(String sql) {
5454
public void parseSql(String sql, SqlTree sqlTree) {
5555
Matcher matcher = PATTERN.matcher(sql);
5656
if(matcher.find()){
57-
String tableName = matcher.group(1).toUpperCase();
57+
String tableName = matcher.group(1);
5858
String fieldsInfoStr = matcher.group(2);
5959
String propsStr = matcher.group(3);
6060
Map<String, Object> props = parseProp(propsStr);

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24+
import org.apache.calcite.config.Lex;
2425
import org.apache.calcite.sql.*;
2526
import org.apache.calcite.sql.parser.SqlParseException;
2627
import org.apache.calcite.sql.parser.SqlParser;
@@ -62,11 +63,14 @@ public void parseSql(String sql, SqlTree sqlTree) {
6263
String tableName = null;
6364
String selectSql = null;
6465
if(matcher.find()) {
65-
tableName = matcher.group(1).toUpperCase();
66+
tableName = matcher.group(1);
6667
selectSql = "select " + matcher.group(2);
6768
}
68-
69-
SqlParser sqlParser = SqlParser.create(selectSql);
69+
SqlParser.Config config = SqlParser
70+
.configBuilder()
71+
.setLex(Lex.MYSQL)
72+
.build();
73+
SqlParser sqlParser = SqlParser.create(selectSql,config);
7074
SqlNode sqlNode = null;
7175
try {
7276
sqlNode = sqlParser.parseStmt();
@@ -89,7 +93,7 @@ public void parseSql(String sql, SqlTree sqlTree) {
8993
String tableName = null;
9094
String fieldsInfoStr = null;
9195
if (matcher.find()){
92-
tableName = matcher.group(1).toUpperCase();
96+
tableName = matcher.group(1);
9397
fieldsInfoStr = matcher.group(2);
9498
}
9599
CreateTmpTableParser.SqlParserResult sqlParseResult = new CreateTmpTableParser.SqlParserResult();

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.parser;
2222

23+
import org.apache.calcite.config.Lex;
2324
import org.apache.calcite.sql.SqlBasicCall;
2425
import org.apache.calcite.sql.SqlInsert;
2526
import org.apache.calcite.sql.SqlJoin;
@@ -57,7 +58,11 @@ public static InsertSqlParser newInstance(){
5758

5859
@Override
5960
public void parseSql(String sql, SqlTree sqlTree) {
60-
SqlParser sqlParser = SqlParser.create(sql);
61+
SqlParser.Config config = SqlParser
62+
.configBuilder()
63+
.setLex(Lex.MYSQL)
64+
.build();
65+
SqlParser sqlParser = SqlParser.create(sql,config);
6166
SqlNode sqlNode = null;
6267
try {
6368
sqlNode = sqlParser.parseStmt();

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24+
import org.apache.calcite.config.Lex;
2425
import org.apache.calcite.sql.JoinType;
2526
import org.apache.calcite.sql.SqlAsOperator;
2627
import org.apache.calcite.sql.SqlBasicCall;
@@ -52,11 +53,14 @@
5253
public class SideSQLParser {
5354

5455
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws SqlParseException {
55-
exeSql = DtStringUtil.replaceIgnoreQuota(exeSql, "`", "");
5656
System.out.println("---exeSql---");
5757
System.out.println(exeSql);
5858
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
59-
SqlParser sqlParser = SqlParser.create(exeSql);
59+
SqlParser.Config config = SqlParser
60+
.configBuilder()
61+
.setLex(Lex.MYSQL)
62+
.build();
63+
SqlParser sqlParser = SqlParser.create(exeSql,config);
6064
SqlNode sqlNode = sqlParser.parseStmt();
6165
parseSql(sqlNode, sideTableSet, queueInfo);
6266
queueInfo.offer(sqlNode);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
645645
String[] filedNameArr = new String[filed.length - 1];
646646
System.arraycopy(filed, 0, filedNameArr, 0, filed.length - 1);
647647
String fieldName = String.join(" ", filedNameArr);
648-
fieldNames.add(fieldName.toUpperCase());
648+
fieldNames.add(fieldName);
649649
String fieldType = filed[filed.length - 1 ].trim();
650650
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
651651
Class tableField = table.getSchema().getType(i).get().getTypeClass();

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,6 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
8282
String[] fieldRows = DtStringUtil.splitIgnoreQuotaBrackets(fieldsInfo, ",");
8383
for(String fieldRow : fieldRows){
8484
fieldRow = fieldRow.trim();
85-
if(fieldNameNeedsUpperCase()) {
86-
fieldRow = fieldRow.toUpperCase();
87-
}
88-
8985
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
9086

9187
if(isMatcherKey){

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.commons.lang3.StringUtils;
2525

2626
import java.util.ArrayList;
27+
import java.util.Iterator;
2728
import java.util.List;
2829
import java.util.Map;
2930

@@ -94,7 +95,7 @@ protected List<String> keyColList(Map<String, List<String>> updateKey) {
9495
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
9596
List<String> list = entry.getValue();
9697
for (String col : list) {
97-
if (!keyCols.contains(col)) {
98+
if (!containsIgnoreCase(keyCols,col)) {
9899
keyCols.add(col);
99100
}
100101
}
@@ -107,10 +108,10 @@ public String getUpdateSql(List<String> column, List<String> fullColumn, String
107108
String prefixRight = StringUtils.isBlank(rightTable) ? "" : quoteTable(rightTable) + ".";
108109
List<String> list = new ArrayList<>();
109110
for (String col : fullColumn) {
110-
if (keyCols == null || keyCols.size() == 0 || keyCols.contains(col)) {
111+
if (keyCols == null || keyCols.size() == 0 || containsIgnoreCase(keyCols,col)) {
111112
continue;
112113
}
113-
if (fullColumn == null || column.contains(col)) {
114+
if (fullColumn == null ||containsIgnoreCase(column,col)) {
114115
list.add(prefixLeft + col + "=" + prefixRight + col);
115116
} else {
116117
list.add(prefixLeft + col + "=null");
@@ -157,6 +158,15 @@ public String makeValues(List<String> column) {
157158
return sb.toString();
158159
}
159160

161+
public boolean containsIgnoreCase(List<String> l, String s) {
162+
Iterator<String> it = l.iterator();
163+
while (it.hasNext()) {
164+
if (it.next().equalsIgnoreCase(s))
165+
return true;
166+
}
167+
return false;
168+
}
169+
160170
public String quoteColumn(String column) {
161171
return getStartQuote() + column + getEndQuote();
162172
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.sql.ResultSet;
2525
import java.sql.SQLException;
2626
import java.util.ArrayList;
27+
import java.util.Iterator;
2728
import java.util.List;
2829
import java.util.Map;
2930

@@ -45,7 +46,7 @@ public boolean isReplaceInsertQuery() throws SQLException {
4546
if (!getRealIndexes().isEmpty()) {
4647
for (List<String> value : getRealIndexes().values()) {
4748
for (String fieldName : getDbSink().getFieldNames()) {
48-
if (value.contains(fieldName)) {
49+
if (containsIgnoreCase(value, fieldName)) {
4950
return true;
5051
}
5152
}
@@ -99,5 +100,12 @@ public void fillFullColumns() throws SQLException {
99100
}
100101
}
101102

102-
103+
public boolean containsIgnoreCase(List<String> l, String s) {
104+
Iterator<String> it = l.iterator();
105+
while (it.hasNext()) {
106+
if (it.next().equalsIgnoreCase(s))
107+
return true;
108+
}
109+
return false;
110+
}
103111
}

sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverSink.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ protected List<String> keyColList(Map<String, List<String>> updateKey) {
9292
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
9393
List<String> list = entry.getValue();
9494
for (String col : list) {
95-
if (!keyCols.contains(col)) {
95+
if (!containsIgnoreCase(keyCols,col)) {
9696
keyCols.add(col);
9797
}
9898
}
@@ -108,7 +108,7 @@ public String getUpdateSql(List<String> column, List<String> fullColumn, String
108108
if (keyCols == null || keyCols.size() == 0) {
109109
continue;
110110
}
111-
if (fullColumn == null || column.contains(col)) {
111+
if (fullColumn == null || containsIgnoreCase(column,col)) {
112112
list.add(prefixLeft + col + "=" + prefixRight + col);
113113
} else {
114114
list.add(prefixLeft + col + "=null");
@@ -154,6 +154,14 @@ public String makeValues(List<String> column) {
154154
return sb.toString();
155155
}
156156

157+
public boolean containsIgnoreCase(List<String> l, String s) {
158+
Iterator<String> it = l.iterator();
159+
while (it.hasNext()) {
160+
if (it.next().equalsIgnoreCase(s))
161+
return true;
162+
}
163+
return false;
164+
}
157165
public String quoteColumn(String column) {
158166
return getStartQuote() + column + getEndQuote();
159167
}

0 commit comments

Comments
 (0)