Skip to content

Commit 30e57d5

Browse files
committed
array type support complete
1 parent 42f2214 commit 30e57d5

3 files changed

Lines changed: 69 additions & 7 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import com.google.common.collect.Maps;
2828
import org.apache.commons.lang3.StringUtils;
2929

30+
import java.util.ArrayList;
31+
import java.util.Arrays;
3032
import java.util.List;
3133
import java.util.Map;
3234
import java.util.regex.Matcher;
@@ -50,6 +52,9 @@ public abstract class AbstractTableParser {
5052
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5153
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5254

55+
private static Pattern compositeTypeHeadPattern = Pattern.compile(".+<.+<");
56+
private static Pattern compositeTypeTailPattern = Pattern.compile(">\\s*>");
57+
5358
private Map<String, Pattern> patternMap = Maps.newHashMap();
5459

5560
private Map<String, ITableFieldDealHandler> handlerMap = Maps.newHashMap();
@@ -85,16 +90,45 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8590
}
8691

8792
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
88-
// TODO 替换
93+
8994
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
95+
96+
ArrayList<String> cache = new ArrayList<>();
97+
boolean currentIsCompositeType = false;
9098
for(String fieldRow : fieldRows){
9199
fieldRow = fieldRow.trim();
92100

93101
if(StringUtils.isBlank(fieldRow)){
94102
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
95103
}
96104

97-
String[] filedInfoArr = fieldRow.split("\\s+");
105+
// 处理复合类型,例如 ARRAY<ROW<foo INT, bar STRING>>
106+
String[] filedInfoArr;
107+
Matcher headMatcher = compositeTypeHeadPattern.matcher(fieldRow);
108+
Matcher tailMatcher = compositeTypeTailPattern.matcher(fieldRow);
109+
110+
if (tailMatcher.find()) {
111+
cache.add(fieldRow);
112+
currentIsCompositeType = false;
113+
fieldRow = String.join("", cache);
114+
cache.clear();
115+
String[] tmp = fieldRow.split("\\s+");
116+
String[] type = Arrays.copyOfRange(tmp, 1, tmp.length);
117+
filedInfoArr = new String[] {
118+
tmp[0],
119+
String.join(" ", type)
120+
};
121+
} else if (headMatcher.find() || currentIsCompositeType) {
122+
currentIsCompositeType = true;
123+
StringBuilder builder = new StringBuilder();
124+
builder.append(fieldRow);
125+
builder.append(",");
126+
cache.add(builder.toString());
127+
continue;
128+
} else {
129+
filedInfoArr = fieldRow.split("\\s+");
130+
}
131+
98132
if(filedInfoArr.length < 2 ){
99133
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
100134
}

core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public static Class<?> stringConvertClass(String str) {
6969
case "varchar":
7070
case "char":
7171
case "text":
72+
case "string":
7273
return String.class;
7374

7475
case "real":

core/src/main/java/com/dtstack/flink/sql/util/DataTypeUtils.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.dtstack.flink.sql.util;
22

3+
import com.google.common.base.Strings;
34
import com.google.common.collect.Iterators;
45
import com.google.common.collect.Lists;
56
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -9,6 +10,7 @@
910
import org.apache.flink.table.api.Types;
1011

1112
import java.util.ArrayList;
13+
import java.util.List;
1214
import java.util.regex.Matcher;
1315
import java.util.regex.Pattern;
1416

@@ -25,10 +27,10 @@ public class DataTypeUtils {
2527
private final static char FIELD_DELIMITER = ',';
2628
private final static char TYPE_DELIMITER = ' ';
2729

28-
private final static Pattern COMPLEX_TYPE_PATTERN = Pattern.compile("^\\s*(\\w+)\\s+(.+?<.+>)\\s*,");
2930
private final static Pattern ATOMIC_TYPE_PATTERN = Pattern.compile("^\\s*(\\w+)\\s+(\\w+)\\s*,");
30-
private final static Pattern TAIL_PATTERN = Pattern.compile("^\\s*(\\w+)\\s+(.+?<.+>)\\s*");
31-
private final static Pattern ATOMIC_TAIL_PATTERN = Pattern.compile("^\\s*(\\w+)\\s+(\\w+)\\s*");
31+
private final static Pattern COMPLEX_TYPE_PATTERN = Pattern.compile("^\\s*(\\w+)\\s+(.+?<.+?>)\\s*,");
32+
private final static Pattern ATOMIC_TAIL_PATTERN = Pattern.compile("^\\s*(\\w+)\\s+(\\w+)\\s*$");
33+
private final static Pattern COMPLEX_TAIL_PATTERN = Pattern.compile("^\\s*(\\w+)\\s+(.+?<.+>)\\s*$");
3234

3335
private DataTypeUtils() {}
3436

@@ -158,8 +160,33 @@ public static TypeInformation convertToAtomicType(String string) {
158160
}
159161
}
160162

161-
public static void splitFields(String fieldStmt) {
162-
// TODO 可以把每轮字符流开始的空白字符去掉。
163+
public static ArrayList<String> fieldStmtLexer(String fieldStmts) {
164+
165+
String stmtStream = fieldStmts;
166+
ArrayList<String> tokens = new ArrayList<>();
167+
while (Strings.isNullOrEmpty(stmtStream)) {
168+
Matcher atomicTypeMatcher = ATOMIC_TYPE_PATTERN.matcher(stmtStream);
169+
Matcher complexTypeMatcher = COMPLEX_TYPE_PATTERN.matcher(stmtStream);
170+
Matcher atomicTypeTailMatcher = ATOMIC_TAIL_PATTERN.matcher(stmtStream);
171+
Matcher complexTypeTailMatcher = COMPLEX_TAIL_PATTERN.matcher(stmtStream);
172+
173+
String fieldStmt;
174+
175+
if (atomicTypeMatcher.find()) {
176+
fieldStmt = atomicTypeMatcher.group(0);
177+
} else if (complexTypeMatcher.find()) {
178+
fieldStmt = complexTypeMatcher.group(0);
179+
} else if (atomicTypeTailMatcher.find()) {
180+
fieldStmt = atomicTypeTailMatcher.group(0);
181+
} else if (complexTypeTailMatcher.find()) {
182+
fieldStmt = complexTypeTailMatcher.group(0);
183+
} else {
184+
throw new RuntimeException("field declaration statement error" + fieldStmts);
185+
}
163186

187+
tokens.add(fieldStmt);
188+
stmtStream = stmtStream.substring(fieldStmt.length() + 1);
189+
}
190+
return tokens;
164191
}
165192
}

0 commit comments

Comments
 (0)