Skip to content

Commit 6394dab

Browse files
committed
Merge branch '1.10_release_4.1.x' into feat_1.10_dirtyDataConsumer
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java
2 parents 023cae3 + 9edcd39 commit 6394dab

18 files changed

Lines changed: 516 additions & 355 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.dtstack.flink.sql.util.DtStringUtil;
2222
import com.google.common.base.Preconditions;
2323
import com.google.common.collect.Maps;
24-
import org.apache.commons.lang3.StringUtils;
2524

2625
import java.util.List;
2726
import java.util.Map;

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,15 @@ public abstract class BaseSideInfo implements Serializable{
7575

7676
protected AbstractSideCache sideCache;
7777

78+
protected JoinInfo joinInfo;
79+
7880
public BaseSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
7981
AbstractSideTableInfo sideTableInfo){
8082
this.rowTypeInfo = rowTypeInfo;
8183
this.outFieldInfoList = outFieldInfoList;
8284
this.joinType = joinInfo.getJoinType();
8385
this.sideTableInfo = sideTableInfo;
86+
this.joinInfo = joinInfo;
8487
parseSelectFields(joinInfo);
8588
buildEqualInfo(joinInfo, sideTableInfo);
8689
}
@@ -118,7 +121,7 @@ public void parseSelectFields(JoinInfo joinInfo){
118121
public String getTargetFieldType(String fieldName){
119122
int fieldIndex = sideTableInfo.getFieldList().indexOf(fieldName);
120123
if(fieldIndex == -1){
121-
throw new RuntimeException(sideTableInfo.getName() + "can't find field: " + fieldName);
124+
throw new RuntimeException(sideTableInfo.getName() + " can't find field: " + fieldName);
122125
}
123126

124127
return sideTableInfo.getFieldTypes()[fieldIndex];
@@ -178,9 +181,7 @@ private void evalEquation(SqlIdentifier left, SqlIdentifier right, String sideTa
178181
*/
179182
private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier) {
180183
String tableName = identifier.getComponent(0).getSimple();
181-
String sideTableName = sideTableInfo.getName();
182-
String errorMsg = "only support set side table constant field, error field " + identifier;
183-
Preconditions.checkState(tableName.equals(sideTableName), errorMsg);
184+
checkSupport(identifier);
184185
String fieldName = identifier.getComponent(1).getSimple();
185186
Object constant = literal.getValue();
186187
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
@@ -194,6 +195,22 @@ private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier)
194195
predicateInfos.add(predicate);
195196
}
196197

198+
private void checkSupport(SqlIdentifier identifier) {
199+
String tableName = identifier.getComponent(0).getSimple();
200+
String sideTableName;
201+
String sideTableAlias;
202+
if (joinInfo.isLeftIsSideTable()) {
203+
sideTableName = joinInfo.getLeftTableName();
204+
sideTableAlias = joinInfo.getLeftTableAlias();
205+
} else {
206+
sideTableName = joinInfo.getRightTableName();
207+
sideTableAlias = joinInfo.getRightTableAlias();
208+
}
209+
boolean isSide = tableName.equals(sideTableName) || tableName.equals(sideTableAlias);
210+
String errorMsg = "only support set side table constant field, error field " + identifier;
211+
Preconditions.checkState(isSide, errorMsg);
212+
}
213+
197214
private void associateField(String sourceTableField, String sideTableField, SqlNode sqlNode) {
198215
String errorMsg = "can't deal equal field: " + sqlNode;
199216
equalFieldList.add(sideTableField);

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ public class JoinInfo implements Serializable {
5555

5656
private String rightTableAlias;
5757

58-
private SqlNode leftNode;
58+
private transient SqlNode leftNode;
5959

60-
private SqlNode rightNode;
60+
private transient SqlNode rightNode;
6161

62-
private SqlNode condition;
62+
private transient SqlNode condition;
6363

64-
private SqlNode selectFields;
64+
private transient SqlNode selectFields;
6565

66-
private SqlNode selectNode;
66+
private transient SqlNode selectNode;
6767

6868
private JoinType joinType;
6969

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ private void joinFun(Object pollObj,
525525
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());
526526

527527
DataStream<BaseRow> adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
528+
.filter(f -> f.f0)
528529
.map(f -> RowDataConvert.convertToBaseRow(f));
529530

530531
//join side table before keyby ===> Reducing the size of each dimension table cache of async

core/src/main/java/com/dtstack/flink/sql/side/operator/SideWithAllCacheOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ private static BaseAllReqRow loadFlatMap(String sideType, String sqlRootDir, Row
6060
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,
6161
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo, String pluginLoadMode) throws Exception {
6262
BaseAllReqRow allReqRow = loadFlatMap(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo, pluginLoadMode);
63-
return inputStream.flatMap(allReqRow);
63+
return inputStream.flatMap(allReqRow).setParallelism(sideTableInfo.getParallelism());
6464
}
6565
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919

2020
package com.dtstack.flink.sql.table;
2121

22+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2223
import com.dtstack.flink.sql.util.ClassUtil;
2324
import com.dtstack.flink.sql.util.DtStringUtil;
2425
import com.google.common.base.Preconditions;
2526
import com.google.common.collect.Maps;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.java.tuple.Tuple2;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2831

2932
import java.util.Arrays;
3033
import java.util.List;
@@ -43,6 +46,8 @@
4346

4447
public abstract class AbstractTableParser {
4548

49+
private static final Logger LOG = LoggerFactory.getLogger(AbstractTableParser.class);
50+
4651
private static final String PRIMARY_KEY = "primaryKey";
4752
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
4853
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
@@ -105,30 +110,50 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
105110
continue;
106111
}
107112

108-
Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName());
109-
String fieldName = t.f0;
110-
String fieldType = t.f1;
113+
handleKeyNotHaveAlias(fieldRow, tableInfo);
114+
}
111115

112-
Class fieldClass;
113-
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
116+
/*
117+
* check whether filed list contains pks and then add pks into field list.
118+
* because some no-sql database is not primary key. eg :redis、hbase etc...
119+
*/
120+
if (tableInfo instanceof AbstractSideTableInfo) {
121+
tableInfo.getPrimaryKeys().stream()
122+
.filter(pk -> (!tableInfo.getFieldList().contains(pk)))
123+
.forEach(pk -> {
124+
try {
125+
handleKeyNotHaveAlias(String.format("%s varchar", pk.trim()), tableInfo);
126+
} catch (Exception e) {
127+
LOG.error(String.format("Add primary key to field list failed. Reason: %s", e.getMessage()));
128+
}
129+
});
130+
}
114131

115-
Matcher matcher = charTypePattern.matcher(fieldType);
116-
if (matcher.find()) {
117-
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
118-
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
119-
fieldExtraInfo.setLength(Integer.parseInt(matcher.group(1)));
120-
} else {
121-
fieldClass = dbTypeConvertToJavaType(fieldType);
122-
}
132+
tableInfo.finish();
133+
}
123134

124-
tableInfo.addPhysicalMappings(fieldName, fieldName);
125-
tableInfo.addField(fieldName);
126-
tableInfo.addFieldClass(fieldClass);
127-
tableInfo.addFieldType(fieldType);
128-
tableInfo.addFieldExtraInfo(fieldExtraInfo);
135+
private void handleKeyNotHaveAlias(String fieldRow, AbstractTableInfo tableInfo) {
136+
Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName());
137+
String fieldName = t.f0;
138+
String fieldType = t.f1;
139+
140+
Class fieldClass;
141+
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
142+
143+
Matcher matcher = charTypePattern.matcher(fieldType);
144+
if (matcher.find()) {
145+
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
146+
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
147+
fieldExtraInfo.setLength(Integer.parseInt(matcher.group(1)));
148+
} else {
149+
fieldClass = dbTypeConvertToJavaType(fieldType);
129150
}
130151

131-
tableInfo.finish();
152+
tableInfo.addPhysicalMappings(fieldName, fieldName);
153+
tableInfo.addField(fieldName);
154+
tableInfo.addFieldClass(fieldClass);
155+
tableInfo.addFieldType(fieldType);
156+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
132157
}
133158

134159
private Tuple2<String, String> extractType(String fieldRow, String tableName) {

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
110110
//The order of the fields defined in the data conversion table
111111
List<Object> sideVal = Lists.newArrayList();
112112
for (String key : colNames) {
113+
114+
if (!sideMap.containsKey(key)) {
115+
sideVal.add(rowKeyStr);
116+
continue;
117+
}
118+
113119
Object val = sideMap.get(key);
114120
if (val == null) {
115121
LOG.error("can't get data with column {}", key);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ public void asyncGetData(String tableName, String rowKeyStr, BaseRow input, Resu
8484
//The order of the fields defined in the data conversion table
8585
List<Object> sideVal = Lists.newArrayList();
8686
for(String key : colNames){
87+
88+
if (!sideMap.containsKey(key)) {
89+
sideVal.add(rowKeyStr);
90+
continue;
91+
}
92+
8793
Object val = sideMap.get(key);
8894
if(val == null){
8995
LOG.error("can't get data with column {}", key);

0 commit comments

Comments
 (0)