11package com .dtstack .flink .sql .util ;
22
3+ import com .google .common .base .Preconditions ;
34import com .google .common .base .Strings ;
45import com .google .common .collect .Iterators ;
56import com .google .common .collect .Lists ;
2122 **/
2223public class DataTypeUtils {
2324
24- private final static Pattern COMPOUND_TYPE_PATTERN = Pattern .compile ("(.+?)<(.+)>" );
25+ private final static Pattern COMPOSITE_TYPE_PATTERN = Pattern .compile ("(.+?)<(.+)>" );
2526 private final static String ARRAY = "ARRAY" ;
2627 private final static String ROW = "ROW" ;
2728 private final static char FIELD_DELIMITER = ',' ;
2829 private final static char TYPE_DELIMITER = ' ' ;
2930
30- private final static Pattern ATOMIC_TYPE_PATTERN = Pattern .compile ("^\\ s*(\\ w+)\\ s+(\\ w+)\\ s*," );
31- private final static Pattern COMPLEX_TYPE_PATTERN = Pattern .compile ("^\\ s*(\\ w+)\\ s+(.+?<.+?>)\\ s*," );
32- private final static Pattern ATOMIC_TAIL_PATTERN = Pattern .compile ("^\\ s*(\\ w+)\\ s+(\\ w+)\\ s*$" );
33- private final static Pattern COMPLEX_TAIL_PATTERN = Pattern .compile ("^\\ s*(\\ w+)\\ s+(.+?<.+>)\\ s*$" );
34-
3531 private DataTypeUtils () {}
3632
37- public static TypeInformation convertToCollectionType (String string ) {
38- Matcher matcher = COMPOUND_TYPE_PATTERN .matcher (string );
33+ public static TypeInformation convertToCompositeType (String string ) {
34+ Matcher matcher = COMPOSITE_TYPE_PATTERN .matcher (string );
3935 // TODO 现在只支持ARRAY类型后续可以加入 MAP等类型
4036 if (matcher .find () && ARRAY .equals (matcher .group (1 ))) {
4137 return convertToArray (string );
4238 } else {
43- throw new RuntimeException ("" );
39+ throw new RuntimeException ("type " + string + "is not support! " );
4440 }
4541 }
4642
@@ -50,7 +46,7 @@ public static TypeInformation convertToCollectionType(String string) {
5046 * @return
5147 */
5248 public static TypeInformation convertToArray (String arrayTypeString ) {
53- Matcher matcher = COMPOUND_TYPE_PATTERN .matcher (arrayTypeString );
49+ Matcher matcher = COMPOSITE_TYPE_PATTERN .matcher (arrayTypeString );
5450 if (matcher .find () && ARRAY .equals (matcher .group (1 ))) {
5551 String elementTypeString = matcher .group (2 );
5652 TypeInformation elementType ;
@@ -61,7 +57,7 @@ public static TypeInformation convertToArray(String arrayTypeString) {
6157 }
6258 return Types .OBJECT_ARRAY (elementType );
6359 } else {
64- throw new RuntimeException ("convert to array type error!" );
60+ throw new RuntimeException (arrayTypeString + "convert to array type error!" );
6561 }
6662
6763 }
@@ -71,19 +67,17 @@ public static TypeInformation convertToArray(String arrayTypeString) {
7167 * @param string
7268 */
7369 public static RowTypeInfo convertToRow (String string ) {
74- Matcher matcher = COMPOUND_TYPE_PATTERN .matcher (string );
70+ Matcher matcher = COMPOSITE_TYPE_PATTERN .matcher (string );
7571
7672 if (matcher .find () &&
77- ROW .equals (
78- matcher .group (1 ).toUpperCase ()
79- )
73+ ROW .equals (matcher .group (1 ).toUpperCase ())
8074 ) {
8175 String elementTypeStr = matcher .group (2 );
8276 Iterable <String > typeInfo = splitCompositeTypeFields (elementTypeStr );
8377 Tuple2 <TypeInformation [], String []> tuple = genFieldInfo (typeInfo );
8478 return new RowTypeInfo (tuple .f0 , tuple .f1 );
8579 } else {
86- throw new RuntimeException (" " );
80+ throw new RuntimeException (string + "convert to row type error! " );
8781 }
8882 }
8983
@@ -95,28 +89,26 @@ private static Iterable<String> splitCompositeTypeFields(String string) {
9589 }
9690
9791 private static Tuple2 <TypeInformation [], String []> genFieldInfo (Iterable <String > typeInfo ) {
98- int fieldsSize = Iterators .size (typeInfo .iterator ());
9992 ArrayList <TypeInformation > types = Lists .newArrayList ();
10093 ArrayList <String > fieldNames = Lists .newArrayList ();
94+
10195 for (String type : typeInfo ) {
10296 Iterable <String > fieldInfo = Splitter
10397 .on (TYPE_DELIMITER )
10498 .trimResults ()
10599 .omitEmptyStrings ()
106100 .split (type );
101+
107102 ArrayList <String > array = Lists .newArrayList (fieldInfo .iterator ());
108- if (array .size () == 2 ) {
109- TypeInformation fieldType = convertToAtomicType (array .get (1 ));
110- types .add (fieldType );
111- fieldNames .add (array .get (0 ));
112- } else {
113- throw new RuntimeException ();
114- }
103+ Preconditions .checkState (array .size () == 2 , "field info must be name with type" );
104+ TypeInformation fieldType = convertToAtomicType (array .get (1 ));
105+ types .add (fieldType );
106+ fieldNames .add (array .get (0 ));
115107 }
116108
117- TypeInformation [] typesArray = types .toArray (new TypeInformation [types .size ()]);
118- String [] fieldNamesArray = fieldNames .toArray (new String [fieldNames .size ()]);
119- return Tuple2 .of (typesArray , fieldNamesArray );
109+ TypeInformation [] typeArray = types .toArray (new TypeInformation [types .size ()]);
110+ String [] fieldNameArray = fieldNames .toArray (new String [fieldNames .size ()]);
111+ return Tuple2 .of (typeArray , fieldNameArray );
120112 }
121113
122114 /**
@@ -156,42 +148,8 @@ public static TypeInformation convertToAtomicType(String string) {
156148 case "TIMESTAMP" :
157149 return Types .SQL_TIMESTAMP ();
158150 default :
159- throw new RuntimeException ("type " + string + "not supported" );
151+ throw new RuntimeException ("type " + string + "not supported, please refer to the flink doc! " );
160152 }
161153 }
162154
163- /**
164- * 目前这个方法未使用,设置当初是想字段声明走统一的词法分析器(分词器)。
165- * @param fieldStmts
166- * @return
167- */
168- public static ArrayList <String > fieldStmtLexer (String fieldStmts ) {
169-
170- String stmtStream = fieldStmts ;
171- ArrayList <String > tokens = new ArrayList <>();
172- while (Strings .isNullOrEmpty (stmtStream )) {
173- Matcher atomicTypeMatcher = ATOMIC_TYPE_PATTERN .matcher (stmtStream );
174- Matcher complexTypeMatcher = COMPLEX_TYPE_PATTERN .matcher (stmtStream );
175- Matcher atomicTypeTailMatcher = ATOMIC_TAIL_PATTERN .matcher (stmtStream );
176- Matcher complexTypeTailMatcher = COMPLEX_TAIL_PATTERN .matcher (stmtStream );
177-
178- String fieldStmt ;
179-
180- if (atomicTypeMatcher .find ()) {
181- fieldStmt = atomicTypeMatcher .group (0 );
182- } else if (complexTypeMatcher .find ()) {
183- fieldStmt = complexTypeMatcher .group (0 );
184- } else if (atomicTypeTailMatcher .find ()) {
185- fieldStmt = atomicTypeTailMatcher .group (0 );
186- } else if (complexTypeTailMatcher .find ()) {
187- fieldStmt = complexTypeTailMatcher .group (0 );
188- } else {
189- throw new RuntimeException ("field declaration statement error" + fieldStmts );
190- }
191-
192- tokens .add (fieldStmt );
193- stmtStream = stmtStream .substring (fieldStmt .length () + 1 );
194- }
195- return tokens ;
196- }
197155}
0 commit comments