Skip to content

Commit f6ce914

Browse files
committed
optimize structure
1 parent 1ed54a0 commit f6ce914

2 files changed

Lines changed: 15 additions & 18 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,15 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
103103
}
104104

105105
// 处理复合类型,例如 ARRAY<ROW<foo INT, bar STRING>>
106+
// 把ARRAY类型的长串字符压入Buffer
106107
Matcher headMatcher = compositeTypeHeadPattern.matcher(fieldRow);
107108
Matcher tailMatcher = compositeTypeTailPattern.matcher(fieldRow);
108-
if (
109-
!tailMatcher.matches() &&
110-
(headMatcher.matches() ||
111-
!buffer.isEmpty())
112-
) {
109+
boolean isNotTail = !tailMatcher.matches();
110+
boolean isToNeedPush = headMatcher.matches() || !buffer.isEmpty();
111+
112+
if (isNotTail && isToNeedPush) {
113113
writeBuffer(buffer, fieldRow);
114114
} else {
115-
116115
String[] fieldInfoArr;
117116
if (tailMatcher.matches()) {
118117
buffer.add(fieldRow);
@@ -122,9 +121,8 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
122121
fieldInfoArr = fieldRow.split("\\s+");
123122
}
124123

125-
if (fieldInfoArr.length < 2) {
126-
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
127-
}
124+
String errorMsg = String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow);
125+
Preconditions.checkState(fieldInfoArr.length >= 2, errorMsg);
128126

129127
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
130128
if (isMatcherKey) {
@@ -149,7 +147,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
149147
fieldClass = dbTypeConvertToJavaType(fieldType);
150148
}
151149

152-
tableInfo.addPhysicalMappings(fieldInfoArr[0],fieldInfoArr[0]);
150+
tableInfo.addPhysicalMappings(fieldInfoArr[0], fieldInfoArr[0]);
153151
tableInfo.addField(fieldName);
154152
tableInfo.addFieldClass(fieldClass);
155153
tableInfo.addFieldType(fieldType);

core/src/main/java/com/dtstack/flink/sql/util/DataTypeUtils.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,24 +84,23 @@ public static RowTypeInfo convertToRow(String rowTypeString) {
8484
Preconditions.checkState(ROW.equals(normalizedType), errorMsg);
8585

8686
String elementTypeStr = matcher.group(2);
87-
Iterable<String> fieldInfos = splitCompositeTypeField(elementTypeStr);
88-
Tuple2<TypeInformation[], String[]> info = genFieldInfo(fieldInfos);
87+
Iterable<String> fieldInfoStrs = splitCompositeTypeField(elementTypeStr);
88+
Tuple2<TypeInformation[], String[]> info = genFieldInfo(fieldInfoStrs);
8989
return new RowTypeInfo(info.f0, info.f1);
9090
}
9191

92-
93-
94-
private static Tuple2<TypeInformation[], String[]> genFieldInfo(Iterable<String> fieldInfos) {
92+
private static Tuple2<TypeInformation[], String[]> genFieldInfo(Iterable<String> fieldInfoStrs) {
9593
ArrayList<TypeInformation> types = Lists.newArrayList();
9694
ArrayList<String> fieldNames = Lists.newArrayList();
9795

98-
for (String fieldInfo : fieldInfos) {
99-
Iterable<String> splitedInfo = splitTypeInfo(fieldInfo);
96+
for (String fieldStr : fieldInfoStrs) {
97+
Iterable<String> splitedInfo = splitTypeInfo(fieldStr);
10098
ArrayList<String> info = Lists.newArrayList(splitedInfo.iterator());
10199
Preconditions.checkState(info.size() == 2, "field info must be name with type");
100+
101+
fieldNames.add(info.get(0));
102102
TypeInformation fieldType = convertToAtomicType(info.get(1));
103103
types.add(fieldType);
104-
fieldNames.add(info.get(0));
105104
}
106105

107106
TypeInformation[] typeArray = types.toArray(new TypeInformation[types.size()]);

0 commit comments

Comments
 (0)