Skip to content

Commit 716c154

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 58f6f4a + efc169d commit 716c154

2 files changed

Lines changed: 26 additions & 10 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@
6868

6969
import java.sql.Timestamp;
7070
import java.time.LocalDateTime;
71+
import java.util.ArrayList;
7172
import java.util.Arrays;
7273
import java.util.LinkedList;
7374
import java.util.List;
7475
import java.util.Map;
76+
import java.util.Optional;
7577
import java.util.Queue;
7678
import java.util.Set;
7779

@@ -317,23 +319,23 @@ private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
317319
* @param conditionNode
318320
* @param joinScope
319321
*/
320-
public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinScope) {
322+
public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinScope, AbstractSideTableInfo sideTableInfo) {
321323
List<SqlNode> sqlNodeList = Lists.newArrayList();
322324
ParseUtils.parseAnd(conditionNode, sqlNodeList);
323325
for (SqlNode sqlNode : sqlNodeList) {
324326
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
325-
throw new RuntimeException("not compare operator.");
327+
throw new RuntimeException("It is not comparison operator.");
326328
}
327329

328330
SqlNode leftNode = ((SqlBasicCall) sqlNode).getOperands()[0];
329331
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
330332

331333
if (leftNode.getKind() == SqlKind.IDENTIFIER) {
332-
checkFieldInTable((SqlIdentifier) leftNode, joinScope, conditionNode);
334+
checkFieldInTable((SqlIdentifier) leftNode, joinScope, conditionNode, sideTableInfo);
333335
}
334336

335337
if (rightNode.getKind() == SqlKind.IDENTIFIER) {
336-
checkFieldInTable((SqlIdentifier) rightNode, joinScope, conditionNode);
338+
checkFieldInTable((SqlIdentifier) rightNode, joinScope, conditionNode, sideTableInfo);
337339
}
338340

339341
}
@@ -345,11 +347,11 @@ public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinSco
345347
* @param joinScope
346348
* @param conditionNode
347349
*/
348-
private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNode conditionNode) {
350+
private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNode conditionNode, AbstractSideTableInfo sideTableInfo) {
349351
String tableName = sqlNode.getComponent(0).getSimple();
350352
String fieldName = sqlNode.getComponent(1).getSimple();
351353
JoinScope.ScopeChild scopeChild = joinScope.getScope(tableName);
352-
String tableErrorMsg = "table [%s] is not exist. error condition is [%s]. if you find [%s] is exist, please check AS statement";
354+
String tableErrorMsg = "Table [%s] is not exist. Error condition is [%s]. If you find [%s] is exist. Please check AS statement.";
353355
Preconditions.checkState(
354356
scopeChild != null,
355357
tableErrorMsg,
@@ -359,8 +361,21 @@ private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNo
359361
);
360362

361363
String[] fieldNames = scopeChild.getRowTypeInfo().getFieldNames();
362-
boolean hasField = Arrays.asList(fieldNames).contains(fieldName);
363-
String fieldErrorMsg = "table [%s] has not [%s] field.\n error join condition is [%s]";
364+
ArrayList<String> allFieldNames = new ArrayList(
365+
Arrays.asList(fieldNames)
366+
);
367+
// HBase、Redis这种NoSQL Primary Key不在字段列表中,所以要加进去。
368+
if (sideTableInfo != null) {
369+
List<String> pks = sideTableInfo.getPrimaryKeys();
370+
if (pks != null) {
371+
pks.stream()
372+
.filter(pk -> !allFieldNames.contains(pk))
373+
.forEach(pk -> allFieldNames.add(pk));
374+
}
375+
}
376+
377+
boolean hasField = allFieldNames.contains(fieldName);
378+
String fieldErrorMsg = "Table [%s] has not [%s] field. Error join condition is [%s]. If you find it is exist. Please check AS statement.";
364379
Preconditions.checkState(
365380
hasField,
366381
fieldErrorMsg,
@@ -481,7 +496,8 @@ private void joinFun(Object pollObj,
481496
joinScope.addScope(rightScopeChild);
482497

483498
HashBasedTable<String, String, String> mappingTable = ((JoinInfo) pollObj).getTableFieldRef();
484-
checkConditionFieldsInTable(joinInfo.getCondition(), joinScope);
499+
// 检查JOIN等式字段是否在原表中
500+
checkConditionFieldsInTable(joinInfo.getCondition(), joinScope, sideTableInfo);
485501

486502
//获取两个表的所有字段
487503
List<FieldInfo> sideJoinFieldInfo = ParserJoinField.getRowTypeInfo(joinInfo.getSelectNode(), joinScope, true);

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class AbstractTableInfo implements Serializable {
5959

6060
private List<String> primaryKeys;
6161

62-
private Integer parallelism = 1;
62+
private Integer parallelism = -1;
6363

6464
public String[] getFieldTypes() {
6565
return fieldTypes;

0 commit comments

Comments
 (0)