Skip to content

Commit b0cb100

Browse files
committed
Merge branch 'feat_1.10_4.1.x_impalaKudu' into '1.10_test_4.1.x'
impala支持动态分区 See merge request dt-insight-engine/flinkStreamSQL!142
2 parents 558dc46 + 60d1d0d commit b0cb100

35 files changed

Lines changed: 685 additions & 373 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Iterator;
4343
import java.util.List;
4444
import java.util.Map;
45+
import java.util.regex.Pattern;
4546

4647
/**
4748
* source data parse to json format
@@ -64,6 +65,9 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
6465
private final List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
6566
private final String charsetName;
6667

68+
private static final Pattern TIMESTAMP_PATTERN = Pattern.compile("^\\d+$");
69+
private static final Pattern TIME_FORMAT_PATTERN = Pattern.compile("\\w+\\d+:\\d+:\\d+");
70+
6771
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
6872
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
6973
String charsetName) {
@@ -146,11 +150,11 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
146150
return Date.valueOf(node.asText());
147151
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
148152
// local zone
149-
return Time.valueOf(node.asText());
153+
return convertToTime(node.asText());
150154
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
151155
// local zone
152-
return Timestamp.valueOf(node.asText());
153-
} else if (info instanceof RowTypeInfo) {
156+
return convertToTimestamp(node.asText());
157+
} else if (info instanceof RowTypeInfo) {
154158
return convertRow(node, (RowTypeInfo) info);
155159
} else if (info instanceof ObjectArrayTypeInfo) {
156160
return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo());
@@ -165,6 +169,29 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
165169
}
166170
}
167171

172+
/**
173+
* 将 2020-09-07 14:49:10.0 和 1598446699685 两种格式都转化为 Timestamp
174+
*/
175+
private Timestamp convertToTimestamp(String timestamp) {
176+
if (TIMESTAMP_PATTERN.matcher(timestamp).find()) {
177+
return new Timestamp(Long.parseLong(timestamp));
178+
}
179+
if (TIME_FORMAT_PATTERN.matcher(timestamp).find()) {
180+
return Timestamp.valueOf(timestamp);
181+
}
182+
throw new IllegalArgumentException("Incorrect time format of timestamp");
183+
}
184+
185+
private Time convertToTime(String timestamp) {
186+
if (TIMESTAMP_PATTERN.matcher(timestamp).find()) {
187+
return new Time(Long.parseLong(timestamp));
188+
}
189+
if (TIME_FORMAT_PATTERN.matcher(timestamp).find()) {
190+
return Time.valueOf(timestamp);
191+
}
192+
throw new IllegalArgumentException("Incorrect time format of time");
193+
}
194+
168195
private Row convertTopRow() {
169196
Row row = new Row(fieldNames.length);
170197
try {
@@ -175,7 +202,7 @@ private Row convertTopRow() {
175202
if (node == null) {
176203
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
177204
throw new IllegalStateException("Failed to find field with name '"
178-
+ fieldNames[i] + "'.");
205+
+ fieldNames[i] + "'.");
179206
} else {
180207
row.setField(i, null);
181208
}
@@ -216,6 +243,7 @@ private Object convertObjectArray(JsonNode node, TypeInformation<?> elementType)
216243
}
217244
return array;
218245
}
246+
219247
@Override
220248
public TypeInformation<Row> getProducedType() {
221249
return typeInfo;

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
import com.google.common.collect.Lists;
2929
import com.google.common.base.Strings;
3030

31+
import java.util.ArrayList;
3132
import java.util.List;
3233
import java.util.Set;
34+
import java.util.regex.Matcher;
35+
import java.util.regex.Pattern;
3336

3437
/**
3538
* Reason:
@@ -51,6 +54,8 @@ public static void setLocalSqlPluginRoot(String localSqlPluginRoot){
5154
LOCAL_SQL_PLUGIN_ROOT = localSqlPluginRoot;
5255
}
5356

57+
private static final Pattern ADD_FIlE_PATTERN = Pattern.compile("(?i).*add\\s+file\\s+.+");
58+
5459
/**
5560
* flink support sql syntax
5661
* CREATE TABLE sls_stream() with ();
@@ -70,6 +75,7 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
7075
.replace("\t", " ").trim();
7176

7277
List<String> sqlArr = DtStringUtil.splitIgnoreQuota(sql, SQL_DELIMITER);
78+
sqlArr = removeAddFileStmt(sqlArr);
7379
SqlTree sqlTree = new SqlTree();
7480
AbstractTableInfoParser tableInfoParser = new AbstractTableInfoParser();
7581
for(String childSql : sqlArr){
@@ -150,4 +156,18 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
150156

151157
return sqlTree;
152158
}
159+
160+
/**
161+
* remove add file with statment etc. add file /etc/krb5.conf;
162+
*/
163+
private static List<String> removeAddFileStmt(List<String> stmts) {
164+
List<String> cleanedStmts = new ArrayList<>();
165+
for (String stmt : stmts) {
166+
Matcher matcher = ADD_FIlE_PATTERN.matcher(stmt);
167+
if(!matcher.matches()) {
168+
cleanedStmts.add(stmt);
169+
}
170+
}
171+
return cleanedStmts;
172+
}
153173
}

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ private Set<String> extractSelectFieldFromJoinCondition(Set<Tuple2<String, Strin
490490
}
491491

492492
if(tableRef.containsKey(field.f0)){
493-
if(fromTableNameSet.contains(tableRef.get(field.f0))){
493+
if(checkContainIterationTableName(fromTableNameSet, field.f0, tableRef)){
494494
extractFieldList.add(tableRef.get(field.f0) + "." + field.f1);
495495
}
496496
}
@@ -499,6 +499,20 @@ private Set<String> extractSelectFieldFromJoinCondition(Set<Tuple2<String, Strin
499499
return extractFieldList;
500500
}
501501

502+
private boolean checkContainIterationTableName(Set<String> fromTableNameSet, String checkTableName, Map<String, String> mappingTableName) {
503+
for (int i = 0; i < mappingTableName.size() + 1; i++) {
504+
if (fromTableNameSet.contains(checkTableName)) {
505+
return true;
506+
}
507+
508+
checkTableName = mappingTableName.get(checkTableName);
509+
if (checkTableName == null) {
510+
return false;
511+
}
512+
}
513+
return true;
514+
}
515+
502516
private Set<String> extractFieldFromGroupByList(SqlNodeList parentGroupByList,
503517
Set<String> fromTableNameSet,
504518
Map<String, String> tableRef){

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predica
140140
}
141141
}
142142

143-
private void fillPredicateInfoToList(SqlBasicCall whereNode, List<PredicateInfo> predicatesInfoList, String operatorName, SqlKind operatorKind,
143+
private void fillPredicateInfoToList(SqlBasicCall whereNode, List<PredicateInfo> predicatesInfoList, String operatorName, SqlKind operatorKind,
144144
int fieldIndex, int conditionIndex) {
145145
SqlNode sqlNode = whereNode.getOperands()[fieldIndex];
146146
if (sqlNode.getKind() == SqlKind.IDENTIFIER) {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -409,10 +409,9 @@ private void joinFun(Object pollObj,
409409

410410
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getFieldTypes(), targetTable.getSchema().getFieldNames());
411411

412-
DataStream adaptStream = tableEnv.toRetractStream(targetTable, Row.class)
412+
DataStream adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
413413
.filter(f -> f.f0)
414-
.map(f -> f.f1)
415-
.returns(Row.class);
414+
.map(f -> f.f1);
416415

417416
//join side table before keyby ===> Reducing the size of each dimension table cache of async
418417
if (sideTableInfo.isPartitionedJoin()) {

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public abstract class AbstractTableInfo implements Serializable {
4949
private final List<String> fieldList = Lists.newArrayList();
5050

5151
/**key:别名, value: realField */
52-
private Map<String, String> physicalFields = Maps.newHashMap();
52+
private Map<String, String> physicalFields = Maps.newLinkedHashMap();
5353

5454
private final List<String> fieldTypeList = Lists.newArrayList();
5555

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.apache.flink.util.Preconditions;
22+
23+
import java.io.File;
24+
25+
/**
26+
* @program: flinkStreamSQL
27+
* @author: wuren
28+
* @create: 2020/09/21
29+
**/
30+
public class DtFileUtils {
31+
public static void checkExists(String path) {
32+
File file = new File(path);
33+
String errorMsg = "%s file is not exist!";
34+
Preconditions.checkState(file.exists(), errorMsg, path);
35+
}
36+
}

core/src/main/java/com/dtstack/flink/sql/util/RowDataConvert.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.dtstack.flink.sql.util;
220

321
import org.apache.flink.table.dataformat.BaseRow;
422
import org.apache.flink.table.dataformat.BinaryString;
23+
import org.apache.flink.table.dataformat.DataFormatConverters;
524
import org.apache.flink.table.dataformat.GenericRow;
625
import org.apache.flink.table.dataformat.SqlTimestamp;
726
import org.apache.flink.types.Row;
827

28+
import java.sql.Date;
29+
import java.sql.Time;
930
import java.sql.Timestamp;
10-
31+
import java.time.LocalDate;
1132

1233
/**
1334
* Company: www.dtstack.com
@@ -17,16 +38,28 @@
1738
*/
1839
public class RowDataConvert {
1940

20-
public static BaseRow convertToBaseRow(Row row){
41+
public static BaseRow convertToBaseRow(Row row) {
2142
int length = row.getArity();
2243
GenericRow genericRow = new GenericRow(length);
23-
for(int i=0; i<length; i++){
24-
if(row.getField(i) instanceof String){
25-
genericRow.setField(i, BinaryString.fromString((String)row.getField(i)));
26-
} else if(row.getField(i) instanceof Timestamp){
27-
SqlTimestamp newTimestamp = SqlTimestamp.fromTimestamp(((Timestamp)row.getField(i)));
44+
for (int i = 0; i < length; i++) {
45+
if (row.getField(i) instanceof String) {
46+
genericRow.setField(i, BinaryString.fromString((String) row.getField(i)));
47+
} else if (row.getField(i) instanceof Timestamp) {
48+
SqlTimestamp newTimestamp = SqlTimestamp.fromTimestamp(((Timestamp) row.getField(i)));
2849
genericRow.setField(i, newTimestamp);
29-
}else{
50+
} else if (row.getField(i) instanceof Time) {
51+
genericRow.setField(i, DataFormatConverters.TimeConverter.INSTANCE.toInternal((Time) row.getField(i)));
52+
} else if (row.getField(i) instanceof Double) {
53+
genericRow.setField(i, DataFormatConverters.DoubleConverter.INSTANCE.toInternal((Double) row.getField(i)));
54+
} else if (row.getField(i) instanceof Float) {
55+
genericRow.setField(i, DataFormatConverters.FloatConverter.INSTANCE.toInternal((Float) row.getField(i)));
56+
} else if (row.getField(i) instanceof Long) {
57+
genericRow.setField(i, DataFormatConverters.LongConverter.INSTANCE.toInternal((Long) row.getField(i)));
58+
} else if (row.getField(i) instanceof Date) {
59+
genericRow.setField(i, DataFormatConverters.DateConverter.INSTANCE.toInternal((Date) row.getField(i)));
60+
} else if (row.getField(i) instanceof LocalDate) {
61+
genericRow.setField(i, DataFormatConverters.LocalDateConverter.INSTANCE.toInternal((LocalDate) row.getField(i)));
62+
} else {
3063
genericRow.setField(i, row.getField(i));
3164
}
3265
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.parser;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
import org.powermock.reflect.Whitebox;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
/**
29+
* @program: flink.sql
30+
* @author: wuren
31+
* @create: 2020/09/15
32+
**/
33+
public class SqlParserTest {
34+
35+
@Test
36+
public void testRemoveAddFileStmt() throws Exception {
37+
List<String> rawStmts = new ArrayList<>();
38+
String sql1 = " add file asdasdasd ";
39+
String sql2 = " aDd fIle With asdasdasd ";
40+
String sql3 = " INSERT INTO dwd_foo SELECT id, name FROM ods_foo";
41+
String sql4 = " ADD FILE asb ";
42+
rawStmts.add(sql1);
43+
rawStmts.add(sql2);
44+
rawStmts.add(sql3);
45+
rawStmts.add(sql4);
46+
47+
List<String> stmts = Whitebox.invokeMethod(SqlParser.class, "removeAddFileStmt", rawStmts);
48+
Assert.assertEquals(stmts.get(0), sql3);
49+
}
50+
51+
}

docs/plugin/hbaseSink.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ CREATE TABLE MyResult(
77
type ='hbase',
88
zookeeperQuorum ='ip:port[,ip:port]',
99
tableName ='tableName',
10-
rowKey ='colName[,colName]',
10+
rowKey ='colName[+colName]',
1111
parallelism ='1',
1212
zookeeperParent ='/hbase'
1313
)
@@ -34,7 +34,7 @@ hbase2.0
3434
|zookeeperQuorum | hbase zk地址,多个直接用逗号隔开|||
3535
|zookeeperParent | zkParent 路径|||
3636
|tableName | 关联的hbase表名称|||
37-
|rowkey | hbase的rowkey关联的列信息,多个值以逗号隔开|||
37+
|rowkey | hbase的rowkey关联的列信息'+'多个值以逗号隔开|||
3838
|updateMode|APPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新|否|APPEND|
3939
|parallelism | 并行度设置||1|
4040
|kerberosAuthEnable | 是否开启kerberos认证||false|
@@ -76,7 +76,7 @@ CREATE TABLE MyResult(
7676
tableName ='myresult',
7777
partitionedJoin ='false',
7878
parallelism ='1',
79-
rowKey='name,channel'
79+
rowKey='name+channel'
8080
);
8181
8282
insert
@@ -141,7 +141,7 @@ into
141141

142142
## 6.hbase数据
143143
### 数据内容说明
144-
hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'-'连接
144+
hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'+'连接
145145
### 数据内容示例
146146
hbase(main):007:0> scan 'myresult'
147147
ROW COLUMN+CELL

0 commit comments

Comments
 (0)