Skip to content

Commit c22c7cc

Browse files
committed
[fix-32711][core] fix nosql(hbase/redis) side join that can not find primary key.
1 parent 22499cc commit c22c7cc

1 file changed

Lines changed: 24 additions & 8 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@
6868

6969
import java.sql.Timestamp;
7070
import java.time.LocalDateTime;
71+
import java.util.ArrayList;
7172
import java.util.Arrays;
7273
import java.util.LinkedList;
7374
import java.util.List;
7475
import java.util.Map;
76+
import java.util.Optional;
7577
import java.util.Queue;
7678
import java.util.Set;
7779

@@ -317,7 +319,7 @@ private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
317319
* @param conditionNode
318320
* @param joinScope
319321
*/
320-
public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinScope) {
322+
public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinScope, Map<String, AbstractSideTableInfo> sideTableMap) {
321323
List<SqlNode> sqlNodeList = Lists.newArrayList();
322324
ParseUtils.parseAnd(conditionNode, sqlNodeList);
323325
for (SqlNode sqlNode : sqlNodeList) {
@@ -329,11 +331,11 @@ public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinSco
329331
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
330332

331333
if (leftNode.getKind() == SqlKind.IDENTIFIER) {
332-
checkFieldInTable((SqlIdentifier) leftNode, joinScope, conditionNode);
334+
checkFieldInTable((SqlIdentifier) leftNode, joinScope, conditionNode, sideTableMap);
333335
}
334336

335337
if (rightNode.getKind() == SqlKind.IDENTIFIER) {
336-
checkFieldInTable((SqlIdentifier) rightNode, joinScope, conditionNode);
338+
checkFieldInTable((SqlIdentifier) rightNode, joinScope, conditionNode, sideTableMap);
337339
}
338340

339341
}
@@ -345,11 +347,11 @@ public void checkConditionFieldsInTable(SqlNode conditionNode, JoinScope joinSco
345347
* @param joinScope
346348
* @param conditionNode
347349
*/
348-
private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNode conditionNode) {
350+
private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNode conditionNode, Map<String, AbstractSideTableInfo> sideTableMap) {
349351
String tableName = sqlNode.getComponent(0).getSimple();
350352
String fieldName = sqlNode.getComponent(1).getSimple();
351353
JoinScope.ScopeChild scopeChild = joinScope.getScope(tableName);
352-
String tableErrorMsg = "table [%s] is not exist. error condition is [%s]. if you find [%s] is exist, please check AS statement";
354+
String tableErrorMsg = "Table [%s] is not exist. Error condition is [%s]. If you find [%s] is exist. Please check AS statement.";
353355
Preconditions.checkState(
354356
scopeChild != null,
355357
tableErrorMsg,
@@ -359,8 +361,22 @@ private void checkFieldInTable(SqlIdentifier sqlNode, JoinScope joinScope, SqlNo
359361
);
360362

361363
String[] fieldNames = scopeChild.getRowTypeInfo().getFieldNames();
362-
boolean hasField = Arrays.asList(fieldNames).contains(fieldName);
363-
String fieldErrorMsg = "table [%s] has not [%s] field.\n error join condition is [%s]";
364+
ArrayList<String> allFieldNames = new ArrayList(
365+
Arrays.asList(fieldNames)
366+
);
367+
AbstractSideTableInfo sideTableInfo = sideTableMap.get(tableName);
368+
// HBase、Redis这种NoSQL Primary Key不在字段列表中,所以要加进去。
369+
if (sideTableInfo != null) {
370+
List<String> pks = sideTableInfo.getPrimaryKeys();
371+
if (pks != null) {
372+
pks.stream()
373+
.filter(pk -> !allFieldNames.contains(pk))
374+
.forEach(pk -> allFieldNames.add(pk));
375+
}
376+
}
377+
378+
boolean hasField = allFieldNames.contains(fieldName);
379+
String fieldErrorMsg = "Table [%s] has not [%s] field. Error join condition is [%s]. If you find it is exist. Please check AS statement.";
364380
Preconditions.checkState(
365381
hasField,
366382
fieldErrorMsg,
@@ -481,7 +497,7 @@ private void joinFun(Object pollObj,
481497
joinScope.addScope(rightScopeChild);
482498

483499
HashBasedTable<String, String, String> mappingTable = ((JoinInfo) pollObj).getTableFieldRef();
484-
checkConditionFieldsInTable(joinInfo.getCondition(), joinScope);
500+
checkConditionFieldsInTable(joinInfo.getCondition(), joinScope, sideTableMap);
485501

486502
//获取两个表的所有字段
487503
List<FieldInfo> sideJoinFieldInfo = ParserJoinField.getRowTypeInfo(joinInfo.getSelectNode(), joinScope, true);

0 commit comments

Comments
 (0)