Skip to content

Commit 1ed54a0

Browse files
committed
Optimize code structure
1 parent 37969bd commit 1ed54a0

2 files changed

Lines changed: 97 additions & 62 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/util/DataTypeUtils.java

Lines changed: 78 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,20 @@ public class DataTypeUtils {
3030

3131
private DataTypeUtils() {}
3232

33-
public static TypeInformation convertToCompositeType(String string) {
34-
Matcher matcher = COMPOSITE_TYPE_PATTERN.matcher(string);
35-
// TODO 现在只支持ARRAY类型后续可以加入 MAP等类型
36-
if (matcher.find() && ARRAY.equals(matcher.group(1))) {
37-
return convertToArray(string);
38-
} else {
39-
throw new RuntimeException("type " + string + "is not support!");
40-
}
33+
/**
34+
* 现在只支持ARRAY类型后续可以加入 MAP等类型
35+
* @param compositeTypeString
36+
* @return
37+
*/
38+
public static TypeInformation convertToCompositeType(String compositeTypeString) {
39+
Matcher matcher = matchCompositeType(compositeTypeString);
40+
final String errorMsg = "type " + compositeTypeString + "is not support!";
41+
Preconditions.checkState(matcher.find(), errorMsg);
42+
43+
String normalizedType = normalizeType(matcher.group(1));
44+
Preconditions.checkState(ARRAY.equals(normalizedType), errorMsg);
45+
46+
return convertToArray(compositeTypeString);
4147
}
4248

4349
/**
@@ -46,64 +52,56 @@ public static TypeInformation convertToCompositeType(String string) {
4652
* @return
4753
*/
4854
public static TypeInformation convertToArray(String arrayTypeString) {
49-
Matcher matcher = COMPOSITE_TYPE_PATTERN.matcher(arrayTypeString);
50-
if (matcher.find() && ARRAY.equals(matcher.group(1))) {
51-
String elementTypeString = matcher.group(2);
52-
TypeInformation elementType;
53-
if (elementTypeString.toUpperCase().startsWith(ROW)) {
54-
elementType = convertToRow(elementTypeString);
55-
} else {
56-
elementType = convertToAtomicType(elementTypeString);
57-
}
58-
return Types.OBJECT_ARRAY(elementType);
55+
Matcher matcher = matchCompositeType(arrayTypeString);
56+
final String errorMsg = arrayTypeString + "convert to array type error!";
57+
Preconditions.checkState(matcher.find(), errorMsg);
58+
59+
String normalizedType = normalizeType(matcher.group(1));
60+
Preconditions.checkState(ARRAY.equals(normalizedType), errorMsg);
61+
62+
String elementTypeString = matcher.group(2);
63+
TypeInformation elementType;
64+
String normalizedElementType = normalizeType(elementTypeString);
65+
if (normalizedElementType.startsWith(ROW)) {
66+
elementType = convertToRow(elementTypeString);
5967
} else {
60-
throw new RuntimeException(arrayTypeString + "convert to array type error!");
68+
elementType = convertToAtomicType(elementTypeString);
6169
}
6270

71+
return Types.OBJECT_ARRAY(elementType);
6372
}
6473

6574
/**
6675
* 目前ROW里只支持基本类型
67-
* @param string
76+
* @param rowTypeString
6877
*/
69-
public static RowTypeInfo convertToRow(String string) {
70-
Matcher matcher = COMPOSITE_TYPE_PATTERN.matcher(string);
71-
72-
if (matcher.find() &&
73-
ROW.equals(matcher.group(1).toUpperCase())
74-
) {
75-
String elementTypeStr = matcher.group(2);
76-
Iterable<String> typeInfo = splitCompositeTypeFields(elementTypeStr);
77-
Tuple2<TypeInformation[], String[]> tuple = genFieldInfo(typeInfo);
78-
return new RowTypeInfo(tuple.f0, tuple.f1);
79-
} else {
80-
throw new RuntimeException(string + "convert to row type error!");
81-
}
78+
public static RowTypeInfo convertToRow(String rowTypeString) {
79+
Matcher matcher = matchCompositeType(rowTypeString);
80+
final String errorMsg = rowTypeString + "convert to row type error!";
81+
Preconditions.checkState(matcher.find(), errorMsg);
82+
83+
String normalizedType = normalizeType(matcher.group(1));
84+
Preconditions.checkState(ROW.equals(normalizedType), errorMsg);
85+
86+
String elementTypeStr = matcher.group(2);
87+
Iterable<String> fieldInfos = splitCompositeTypeField(elementTypeStr);
88+
Tuple2<TypeInformation[], String[]> info = genFieldInfo(fieldInfos);
89+
return new RowTypeInfo(info.f0, info.f1);
8290
}
8391

84-
private static Iterable<String> splitCompositeTypeFields(String string) {
85-
return Splitter
86-
.on(FIELD_DELIMITER)
87-
.trimResults()
88-
.split(string);
89-
}
9092

91-
private static Tuple2<TypeInformation[], String[]> genFieldInfo(Iterable<String> typeInfo) {
93+
94+
private static Tuple2<TypeInformation[], String[]> genFieldInfo(Iterable<String> fieldInfos) {
9295
ArrayList<TypeInformation> types = Lists.newArrayList();
9396
ArrayList<String> fieldNames = Lists.newArrayList();
9497

95-
for (String type : typeInfo) {
96-
Iterable<String> fieldInfo = Splitter
97-
.on(TYPE_DELIMITER)
98-
.trimResults()
99-
.omitEmptyStrings()
100-
.split(type);
101-
102-
ArrayList<String> array = Lists.newArrayList(fieldInfo.iterator());
103-
Preconditions.checkState(array.size() == 2, "field info must be name with type");
104-
TypeInformation fieldType = convertToAtomicType(array.get(1));
98+
for (String fieldInfo : fieldInfos) {
99+
Iterable<String> splitedInfo = splitTypeInfo(fieldInfo);
100+
ArrayList<String> info = Lists.newArrayList(splitedInfo.iterator());
101+
Preconditions.checkState(info.size() == 2, "field info must be name with type");
102+
TypeInformation fieldType = convertToAtomicType(info.get(1));
105103
types.add(fieldType);
106-
fieldNames.add(array.get(0));
104+
fieldNames.add(info.get(0));
107105
}
108106

109107
TypeInformation[] typeArray = types.toArray(new TypeInformation[types.size()]);
@@ -117,7 +115,7 @@ private static Tuple2<TypeInformation[], String[]> genFieldInfo(Iterable<String>
117115
* @return
118116
*/
119117
public static TypeInformation convertToAtomicType(String string) {
120-
switch (string.toUpperCase()) {
118+
switch (normalizeType(string)) {
121119
case "VARCHAR":
122120
case "STRING":
123121
return Types.STRING();
@@ -152,4 +150,32 @@ public static TypeInformation convertToAtomicType(String string) {
152150
}
153151
}
154152

153+
private static Iterable<String> splitTypeInfo(String string) {
154+
return Splitter
155+
.on(TYPE_DELIMITER)
156+
.trimResults()
157+
.omitEmptyStrings()
158+
.split(string);
159+
}
160+
161+
private static Iterable<String> splitCompositeTypeField(String string) {
162+
return Splitter
163+
.on(FIELD_DELIMITER)
164+
.trimResults()
165+
.split(string);
166+
}
167+
168+
private static String replaceBlank(String s) {
169+
return s.replaceAll("\\s", " ").trim();
170+
}
171+
172+
private static Matcher matchCompositeType(String s) {
173+
return COMPOSITE_TYPE_PATTERN.matcher(
174+
replaceBlank(s)
175+
);
176+
}
177+
178+
private static String normalizeType(String s) {
179+
return s.toUpperCase().trim();
180+
}
155181
}

core/src/test/java/com/dtstack/flink/sql/util/DataTypeUtilsTest.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,35 @@ public class DataTypeUtilsTest {
1515

1616
@Test
1717
public void convertToArray() {
18+
String[] normalFieldNames = new String[] { "id", "name" };
19+
TypeInformation[] normalTypes = new TypeInformation[] {Types.INT(), Types.STRING()};
20+
RowTypeInfo rowTypeInfo = new RowTypeInfo(normalTypes, normalFieldNames);
21+
TypeInformation normalAtomicType = Types.OBJECT_ARRAY(Types.STRING());
22+
TypeInformation normalCompositeType = Types.OBJECT_ARRAY(rowTypeInfo);
23+
24+
String atomicStrWithBlank = " ARRAY< STRING\n > ";
25+
String compositeTypeStrWithBlank = " ARRAY< ROW< id INT, name STRING > > ";
26+
27+
TypeInformation atomicArrayTypeWithBlank = DataTypeUtils.convertToArray(atomicStrWithBlank);
28+
TypeInformation compositeTypeArrayTypeWithBlank = DataTypeUtils.convertToArray(compositeTypeStrWithBlank);
29+
30+
Assert.assertTrue(normalAtomicType.equals(atomicArrayTypeWithBlank));
31+
Assert.assertTrue(normalCompositeType.equals(compositeTypeArrayTypeWithBlank));
32+
33+
1834
String atomicStr = "ARRAY<STRING>";
1935
String compositeTypeStr = "ARRAY<ROW<id INT, name STRING>>";
2036

2137
TypeInformation atomicArrayType = DataTypeUtils.convertToArray(atomicStr);
2238
TypeInformation compositeTypeArrayType = DataTypeUtils.convertToArray(compositeTypeStr);
2339

24-
25-
String[] normalFieldNames = new String[] {"id", "name"};
26-
TypeInformation[] normalTypes = new TypeInformation[] {Types.INT(), Types.STRING()};
27-
RowTypeInfo rowTypeInfo = new RowTypeInfo(normalTypes, normalFieldNames);
28-
TypeInformation normalAtomicType = Types.OBJECT_ARRAY(Types.STRING());
29-
TypeInformation normalCompositeType = Types.OBJECT_ARRAY(rowTypeInfo);
30-
3140
Assert.assertTrue(normalAtomicType.equals(atomicArrayType));
3241
Assert.assertTrue(normalCompositeType.equals(compositeTypeArrayType));
3342
}
3443

3544
@Test
3645
public void convertToRow() {
37-
String string = "ROW<id INT, name STRING>";
46+
String string = " ROW < id INT, name STRING > ";
3847
RowTypeInfo rowType = DataTypeUtils.convertToRow(string);
3948

4049
String[] fieldNames = rowType.getFieldNames();
@@ -48,7 +57,7 @@ public void convertToRow() {
4857

4958
@Test
5059
public void convertToAtomicType() {
51-
TypeInformation type = DataTypeUtils.convertToAtomicType("STRING");
52-
Assert.assertTrue(type == Types.STRING());
60+
TypeInformation type = DataTypeUtils.convertToAtomicType(" TIMESTAMP ");
61+
Assert.assertTrue(type == Types.SQL_TIMESTAMP());
5362
}
5463
}

0 commit comments

Comments
 (0)