Skip to content

Commit 82291f9

Browse files
committed
add array type AS keyword support
1 parent 30e57d5 commit 82291f9

2 files changed

Lines changed: 73 additions & 55 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 68 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ public abstract class AbstractTableParser {
4848
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4949

5050
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
51-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
51+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
5252
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5353
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5454

55-
private static Pattern compositeTypeHeadPattern = Pattern.compile(".+<.+<");
56-
private static Pattern compositeTypeTailPattern = Pattern.compile(">\\s*>");
55+
private static Pattern compositeTypeHeadPattern = Pattern.compile(".+<.+<.+");
56+
private static Pattern compositeTypeTailPattern = Pattern.compile(".*>\\s*>.*");
5757

5858
private Map<String, Pattern> patternMap = Maps.newHashMap();
5959

@@ -93,8 +93,8 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
9393

9494
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
9595

96-
ArrayList<String> cache = new ArrayList<>();
97-
boolean currentIsCompositeType = false;
96+
ArrayList<String> buffer = new ArrayList<>();
97+
9898
for(String fieldRow : fieldRows){
9999
fieldRow = fieldRow.trim();
100100

@@ -103,65 +103,58 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
103103
}
104104

105105
// 处理复合类型,例如 ARRAY<ROW<foo INT, bar STRING>>
106-
String[] filedInfoArr;
107106
Matcher headMatcher = compositeTypeHeadPattern.matcher(fieldRow);
108107
Matcher tailMatcher = compositeTypeTailPattern.matcher(fieldRow);
109-
110-
if (tailMatcher.find()) {
111-
cache.add(fieldRow);
112-
currentIsCompositeType = false;
113-
fieldRow = String.join("", cache);
114-
cache.clear();
115-
String[] tmp = fieldRow.split("\\s+");
116-
String[] type = Arrays.copyOfRange(tmp, 1, tmp.length);
117-
filedInfoArr = new String[] {
118-
tmp[0],
119-
String.join(" ", type)
120-
};
121-
} else if (headMatcher.find() || currentIsCompositeType) {
122-
currentIsCompositeType = true;
123-
StringBuilder builder = new StringBuilder();
124-
builder.append(fieldRow);
125-
builder.append(",");
126-
cache.add(builder.toString());
127-
continue;
108+
if (
109+
!tailMatcher.matches() &&
110+
(headMatcher.matches() ||
111+
!buffer.isEmpty())
112+
) {
113+
writeBuffer(buffer, fieldRow);
128114
} else {
129-
filedInfoArr = fieldRow.split("\\s+");
130-
}
131-
132-
if(filedInfoArr.length < 2 ){
133-
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
134-
}
135115

136-
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
137-
if(isMatcherKey){
138-
continue;
139-
}
116+
String[] fieldInfoArr;
117+
if (tailMatcher.matches()) {
118+
buffer.add(fieldRow);
119+
fieldRow = String.join("", buffer);
120+
fieldInfoArr = readBuffer(buffer);
121+
} else {
122+
fieldInfoArr = fieldRow.split("\\s+");
123+
}
140124

141-
//Compatible situation may arise in space in the fieldName
142-
String[] filedNameArr = new String[filedInfoArr.length - 1];
143-
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
144-
String fieldName = String.join(" ", filedNameArr);
145-
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
125+
if (fieldInfoArr.length < 2) {
126+
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
127+
}
146128

129+
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
130+
if (isMatcherKey) {
131+
continue;
132+
}
147133

148-
Class fieldClass = null;
149-
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
134+
//Compatible situation may arise in space in the fieldName
135+
String[] filedNameArr = new String[fieldInfoArr.length - 1];
136+
System.arraycopy(fieldInfoArr, 0, filedNameArr, 0, fieldInfoArr.length - 1);
137+
String fieldName = String.join(" ", filedNameArr);
138+
String fieldType = fieldInfoArr[fieldInfoArr.length - 1 ].trim();
139+
140+
Class fieldClass = null;
141+
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
142+
143+
Matcher matcher = charTypePattern.matcher(fieldType);
144+
if (matcher.find()) {
145+
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
146+
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
147+
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
148+
} else {
149+
fieldClass = dbTypeConvertToJavaType(fieldType);
150+
}
150151

151-
Matcher matcher = charTypePattern.matcher(fieldType);
152-
if (matcher.find()) {
153-
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
154-
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
155-
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
156-
} else {
157-
fieldClass = dbTypeConvertToJavaType(fieldType);
152+
tableInfo.addPhysicalMappings(fieldInfoArr[0],fieldInfoArr[0]);
153+
tableInfo.addField(fieldName);
154+
tableInfo.addFieldClass(fieldClass);
155+
tableInfo.addFieldType(fieldType);
156+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
158157
}
159-
160-
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
161-
tableInfo.addField(fieldName);
162-
tableInfo.addFieldClass(fieldClass);
163-
tableInfo.addFieldType(fieldType);
164-
tableInfo.addFieldExtraInfo(fieldExtraInfo);
165158
}
166159

167160
tableInfo.finish();
@@ -206,4 +199,24 @@ protected void addParserHandler(String parserName, Pattern pattern, ITableFieldD
206199
patternMap.put(parserName, pattern);
207200
handlerMap.put(parserName, handler);
208201
}
202+
203+
private void writeBuffer(List<String> buffer, String fieldRow) {
204+
StringBuilder builder = new StringBuilder();
205+
builder.append(fieldRow);
206+
builder.append(",");
207+
buffer.add(builder.toString());
208+
}
209+
210+
private String[] readBuffer(List<String> buffer) {
211+
String fieldRow = String.join("", buffer);
212+
buffer.clear();
213+
String[] tmp = fieldRow.split("\\s+");
214+
String[] type = Arrays.copyOfRange(tmp, 1, tmp.length);
215+
String[] fieldInfoArr = new String[] {
216+
tmp[0],
217+
String.join(" ", type)
218+
};
219+
return fieldInfoArr;
220+
}
221+
209222
}

core/src/main/java/com/dtstack/flink/sql/util/DataTypeUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ public static TypeInformation convertToAtomicType(String string) {
160160
}
161161
}
162162

163+
/**
164+
* 目前这个方法未使用,设置当初是想字段声明走统一的词法分析器(分词器)。
165+
* @param fieldStmts
166+
* @return
167+
*/
163168
public static ArrayList<String> fieldStmtLexer(String fieldStmts) {
164169

165170
String stmtStream = fieldStmts;

0 commit comments

Comments
 (0)