Skip to content

Commit 3ae08ec

Browse files
committed
case upper and lower
1 parent 21f3020 commit 3ae08ec

7 files changed

Lines changed: 58 additions & 12 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql;
2222

@@ -34,6 +34,7 @@
3434
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
3535
import com.dtstack.flink.sql.util.FlinkUtil;
3636
import com.dtstack.flink.sql.util.PluginUtil;
37+
import org.apache.calcite.config.Lex;
3738
import org.apache.calcite.sql.SqlInsert;
3839
import org.apache.calcite.sql.SqlNode;
3940
import org.apache.commons.cli.CommandLine;
@@ -185,6 +186,11 @@ public static void main(String[] args) throws Exception {
185186
if (sqlTree.getTmpTableMap().containsKey(tableName)) {
186187
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
187188
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
189+
190+
org.apache.calcite.sql.parser.SqlParser.Config config = org.apache.calcite.sql.parser.SqlParser
191+
.configBuilder()
192+
.setLex(Lex.MYSQL)
193+
.build();
188194
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql).parseStmt();
189195
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
190196
tmp.setExecSql(tmpSql);

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24+
import org.apache.calcite.config.Lex;
2425
import org.apache.calcite.sql.*;
2526
import org.apache.calcite.sql.parser.SqlParseException;
2627
import org.apache.calcite.sql.parser.SqlParser;
@@ -65,8 +66,11 @@ public void parseSql(String sql, SqlTree sqlTree) {
6566
tableName = matcher.group(1).toUpperCase();
6667
selectSql = "select " + matcher.group(2);
6768
}
68-
69-
SqlParser sqlParser = SqlParser.create(selectSql);
69+
SqlParser.Config config = SqlParser
70+
.configBuilder()
71+
.setLex(Lex.MYSQL)
72+
.build();
73+
SqlParser sqlParser = SqlParser.create(selectSql,config);
7074
SqlNode sqlNode = null;
7175
try {
7276
sqlNode = sqlParser.parseStmt();

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.parser;
2222

23+
import org.apache.calcite.config.Lex;
2324
import org.apache.calcite.sql.SqlBasicCall;
2425
import org.apache.calcite.sql.SqlInsert;
2526
import org.apache.calcite.sql.SqlJoin;
@@ -57,7 +58,11 @@ public static InsertSqlParser newInstance(){
5758

5859
@Override
5960
public void parseSql(String sql, SqlTree sqlTree) {
60-
SqlParser sqlParser = SqlParser.create(sql);
61+
SqlParser.Config config = SqlParser
62+
.configBuilder()
63+
.setLex(Lex.MYSQL)
64+
.build();
65+
SqlParser sqlParser = SqlParser.create(sql,config);
6166
SqlNode sqlNode = null;
6267
try {
6368
sqlNode = sqlParser.parseStmt();

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24+
import org.apache.calcite.config.Lex;
2425
import org.apache.calcite.sql.JoinType;
2526
import org.apache.calcite.sql.SqlAsOperator;
2627
import org.apache.calcite.sql.SqlBasicCall;
@@ -56,7 +57,11 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
5657
System.out.println("---exeSql---");
5758
System.out.println(exeSql);
5859
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
59-
SqlParser sqlParser = SqlParser.create(exeSql);
60+
SqlParser.Config config = SqlParser
61+
.configBuilder()
62+
.setLex(Lex.MYSQL)
63+
.build();
64+
SqlParser sqlParser = SqlParser.create(exeSql,config);
6065
SqlNode sqlNode = sqlParser.parseStmt();
6166
parseSql(sqlNode, sideTableSet, queueInfo);
6267
queueInfo.offer(sqlNode);

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.commons.lang3.StringUtils;
2525

2626
import java.util.ArrayList;
27+
import java.util.Iterator;
2728
import java.util.List;
2829
import java.util.Map;
2930

@@ -94,7 +95,7 @@ protected List<String> keyColList(Map<String, List<String>> updateKey) {
9495
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
9596
List<String> list = entry.getValue();
9697
for (String col : list) {
97-
if (!keyCols.contains(col)) {
98+
if (!containsIgnoreCase(keyCols,col)) {
9899
keyCols.add(col);
99100
}
100101
}
@@ -107,10 +108,10 @@ public String getUpdateSql(List<String> column, List<String> fullColumn, String
107108
String prefixRight = StringUtils.isBlank(rightTable) ? "" : quoteTable(rightTable) + ".";
108109
List<String> list = new ArrayList<>();
109110
for (String col : fullColumn) {
110-
if (keyCols == null || keyCols.size() == 0 || keyCols.contains(col)) {
111+
if (keyCols == null || keyCols.size() == 0 || containsIgnoreCase(keyCols,col)) {
111112
continue;
112113
}
113-
if (fullColumn == null || column.contains(col)) {
114+
if (fullColumn == null ||containsIgnoreCase(column,col)) {
114115
list.add(prefixLeft + col + "=" + prefixRight + col);
115116
} else {
116117
list.add(prefixLeft + col + "=null");
@@ -157,6 +158,15 @@ public String makeValues(List<String> column) {
157158
return sb.toString();
158159
}
159160

161+
public boolean containsIgnoreCase(List<String> l, String s) {
162+
Iterator<String> it = l.iterator();
163+
while (it.hasNext()) {
164+
if (it.next().equalsIgnoreCase(s))
165+
return true;
166+
}
167+
return false;
168+
}
169+
160170
public String quoteColumn(String column) {
161171
return getStartQuote() + column + getEndQuote();
162172
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.sql.ResultSet;
2525
import java.sql.SQLException;
2626
import java.util.ArrayList;
27+
import java.util.Iterator;
2728
import java.util.List;
2829
import java.util.Map;
2930

@@ -45,7 +46,7 @@ public boolean isReplaceInsertQuery() throws SQLException {
4546
if (!getRealIndexes().isEmpty()) {
4647
for (List<String> value : getRealIndexes().values()) {
4748
for (String fieldName : getDbSink().getFieldNames()) {
48-
if (value.contains(fieldName)) {
49+
if (containsIgnoreCase(value, fieldName)) {
4950
return true;
5051
}
5152
}
@@ -99,5 +100,12 @@ public void fillFullColumns() throws SQLException {
99100
}
100101
}
101102

102-
103+
public boolean containsIgnoreCase(List<String> l, String s) {
104+
Iterator<String> it = l.iterator();
105+
while (it.hasNext()) {
106+
if (it.next().equalsIgnoreCase(s))
107+
return true;
108+
}
109+
return false;
110+
}
103111
}

sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverSink.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ protected List<String> keyColList(Map<String, List<String>> updateKey) {
9292
for (Map.Entry<String, List<String>> entry : updateKey.entrySet()) {
9393
List<String> list = entry.getValue();
9494
for (String col : list) {
95-
if (!keyCols.contains(col)) {
95+
if (!containsIgnoreCase(keyCols,col)) {
9696
keyCols.add(col);
9797
}
9898
}
@@ -108,7 +108,7 @@ public String getUpdateSql(List<String> column, List<String> fullColumn, String
108108
if (keyCols == null || keyCols.size() == 0) {
109109
continue;
110110
}
111-
if (fullColumn == null || column.contains(col)) {
111+
if (fullColumn == null || containsIgnoreCase(column,col)) {
112112
list.add(prefixLeft + col + "=" + prefixRight + col);
113113
} else {
114114
list.add(prefixLeft + col + "=null");
@@ -154,6 +154,14 @@ public String makeValues(List<String> column) {
154154
return sb.toString();
155155
}
156156

157+
public boolean containsIgnoreCase(List<String> l, String s) {
158+
Iterator<String> it = l.iterator();
159+
while (it.hasNext()) {
160+
if (it.next().equalsIgnoreCase(s))
161+
return true;
162+
}
163+
return false;
164+
}
157165
public String quoteColumn(String column) {
158166
return getStartQuote() + column + getEndQuote();
159167
}

0 commit comments

Comments
 (0)