Skip to content

Commit faafbd0

Browse files
committed
add license and doc, update split part
1 parent 7da4ff9 commit faafbd0

4 files changed

Lines changed: 132 additions & 59 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 29 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
20-
2119
package com.dtstack.flink.sql.table;
2220

2321
import com.dtstack.flink.sql.util.ClassUtil;
@@ -27,7 +25,6 @@
2725
import com.google.common.collect.Maps;
2826
import org.apache.commons.lang3.StringUtils;
2927

30-
import java.util.ArrayList;
3128
import java.util.Arrays;
3229
import java.util.List;
3330
import java.util.Map;
@@ -52,9 +49,6 @@ public abstract class AbstractTableParser {
5249
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5350
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5451

55-
private static Pattern compositeTypeHeadPattern = Pattern.compile(".+<.+<.+");
56-
private static Pattern compositeTypeTailPattern = Pattern.compile(".*>\\s*>.*");
57-
5852
private Map<String, Pattern> patternMap = Maps.newHashMap();
5953

6054
private Map<String, ITableFieldDealHandler> handlerMap = Maps.newHashMap();
@@ -89,76 +83,56 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8983
return false;
9084
}
9185

92-
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
86+
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
9387

9488
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
9589

96-
ArrayList<String> buffer = new ArrayList<>();
97-
9890
for (String fieldRow : fieldRows) {
9991
fieldRow = fieldRow.trim();
10092

10193
if (StringUtils.isBlank(fieldRow)) {
10294
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
10395
}
10496

105-
// 处理复合类型,例如 ARRAY<ROW<foo INT, bar STRING>>
106-
// 把ARRAY类型的长串字符压入Buffer
107-
Matcher headMatcher = compositeTypeHeadPattern.matcher(fieldRow);
108-
Matcher tailMatcher = compositeTypeTailPattern.matcher(fieldRow);
109-
boolean isNotTail = !tailMatcher.matches();
110-
boolean isToNeedPush = headMatcher.matches() || !buffer.isEmpty();
97+
String[] fieldInfoArr = fieldRow.split("\\s+");
11198

112-
if (isNotTail && isToNeedPush) {
113-
writeBuffer(buffer, fieldRow);
114-
} else {
115-
String[] fieldInfoArr;
116-
if (tailMatcher.matches()) {
117-
buffer.add(fieldRow);
118-
fieldRow = String.join("", buffer);
119-
fieldInfoArr = readBuffer(buffer);
120-
} else {
121-
fieldInfoArr = fieldRow.split("\\s+");
122-
}
99+
String errorMsg = String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow);
100+
Preconditions.checkState(fieldInfoArr.length >= 2, errorMsg);
123101

124-
String errorMsg = String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow);
125-
Preconditions.checkState(fieldInfoArr.length >= 2, errorMsg);
102+
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
103+
if (isMatcherKey) {
104+
continue;
105+
}
126106

127-
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
128-
if (isMatcherKey) {
129-
continue;
130-
}
107+
//Compatible situation may arise in space in the fieldName
108+
String[] filedNameArr = new String[fieldInfoArr.length - 1];
109+
System.arraycopy(fieldInfoArr, 0, filedNameArr, 0, fieldInfoArr.length - 1);
110+
String fieldName = String.join(" ", filedNameArr);
111+
String fieldType = fieldInfoArr[fieldInfoArr.length - 1 ].trim();
131112

132-
//Compatible situation may arise in space in the fieldName
133-
String[] filedNameArr = new String[fieldInfoArr.length - 1];
134-
System.arraycopy(fieldInfoArr, 0, filedNameArr, 0, fieldInfoArr.length - 1);
135-
String fieldName = String.join(" ", filedNameArr);
136-
String fieldType = fieldInfoArr[fieldInfoArr.length - 1 ].trim();
137-
138-
Class fieldClass = null;
139-
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
140-
141-
Matcher matcher = charTypePattern.matcher(fieldType);
142-
if (matcher.find()) {
143-
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
144-
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
145-
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
146-
} else {
147-
fieldClass = dbTypeConvertToJavaType(fieldType);
148-
}
113+
Class fieldClass = null;
114+
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
149115

150-
tableInfo.addPhysicalMappings(fieldInfoArr[0], fieldInfoArr[0]);
151-
tableInfo.addField(fieldName);
152-
tableInfo.addFieldClass(fieldClass);
153-
tableInfo.addFieldType(fieldType);
154-
tableInfo.addFieldExtraInfo(fieldExtraInfo);
116+
Matcher matcher = charTypePattern.matcher(fieldType);
117+
if (matcher.find()) {
118+
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
119+
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
120+
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
121+
} else {
122+
fieldClass = dbTypeConvertToJavaType(fieldType);
155123
}
124+
125+
tableInfo.addPhysicalMappings(fieldInfoArr[0], fieldInfoArr[0]);
126+
tableInfo.addField(fieldName);
127+
tableInfo.addFieldClass(fieldClass);
128+
tableInfo.addFieldType(fieldType);
129+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
156130
}
157131

158132
tableInfo.finish();
159133
}
160134

161-
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo){
135+
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo) {
162136
String primaryFields = matcher.group(1).trim();
163137
String[] splitArry = primaryFields.split(",");
164138
List<String> primaryKes = Lists.newArrayList(splitArry);

core/src/main/java/com/dtstack/flink/sql/util/DataTypeUtils.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.util;
220

321
import com.google.common.base.Preconditions;
4-
import com.google.common.base.Strings;
5-
import com.google.common.collect.Iterators;
622
import com.google.common.collect.Lists;
723
import org.apache.flink.api.common.typeinfo.TypeInformation;
824
import org.apache.flink.api.java.tuple.Tuple2;
@@ -11,7 +27,6 @@
1127
import org.apache.flink.table.api.Types;
1228

1329
import java.util.ArrayList;
14-
import java.util.List;
1530
import java.util.regex.Matcher;
1631
import java.util.regex.Pattern;
1732

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
8989
} else if (c == ')' && !inSingleQuotes && !inQuotes) {
9090
bracketLeftNum--;
9191
b.append(c);
92+
} else if (c == '<' && !inSingleQuotes && !inQuotes) {
93+
bracketLeftNum++;
94+
b.append(c);
95+
} else if (c == '>' && !inSingleQuotes && !inQuotes) {
96+
bracketLeftNum--;
97+
b.append(c);
9298
} else {
9399
b.append(c);
94100
}

docs/plugin/kafkaSource.md

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,85 @@ CREATE TABLE MyTable(
130130
);
131131
```
132132

133-
数组类型字段解析示例
133+
#### 数组类型字段解析示例
134+
135+
声明数据类型为Array可以使用CROSS JOIN UNNEST语句对Array类型进行展开。(新模式,推荐)
136+
137+
json样例
138+
```
139+
{
140+
"id": 2,
141+
"some_users": [
142+
"foo",
143+
"bar"
144+
]
145+
}
146+
```
147+
148+
SQL样例
149+
```
150+
CREATE TABLE ods (
151+
id INT,
152+
some_users ARRAY<STRING>>
153+
) WITH (
154+
...
155+
);
156+
157+
CREATE TABLE dwd (
158+
id INT,
159+
user_no INT,
160+
user_info VARCHAR
161+
) WITH (
162+
type ='console',
163+
);
164+
165+
INSERT INTO dwd
166+
SELECT id, user_info
167+
FROM ods_foo
168+
CROSS JOIN UNNEST(ods_foo.some_users) AS A (user_info);
169+
```
170+
171+
json样例
172+
```
173+
{
174+
"id": 4,
175+
"some_users": [
176+
{
177+
"user_no": 12,
178+
"user_info": "foo"
179+
180+
},
181+
{
182+
"user_no": 14,
183+
"user_info": "bar"
184+
}
185+
]
186+
187+
```
188+
189+
SQL样例
190+
```
191+
CREATE TABLE ods (
192+
id INT,
193+
some_users ARRAY<ROW<user_no INT, user_info STRING>>
194+
) WITH (
195+
...
196+
);
197+
198+
CREATE TABLE dwd (
199+
id INT,
200+
user_no INT,
201+
user_info VARCHAR
202+
) WITH (
203+
type ='console',
204+
);
205+
206+
INSERT INTO dwd
207+
SELECT id, user_no, user_info
208+
FROM ods_foo
209+
CROSS JOIN UNNEST(ods_foo.some_users) AS A (user_no, user_info);
210+
```
211+
##### 旧模式
134212

135213
json: {"name":"tom", "obj":{"channel": "root"}, "user": [{"pv": 4}, {"pv": 10}], "xctime":1572932485}
136214
```

0 commit comments

Comments
 (0)