Skip to content

Commit 84a44b2

Browse files
committed
mongo rdb syncside bug fix
1 parent 12bd803 commit 84a44b2

2 files changed

Lines changed: 5 additions & 4 deletions

File tree

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
130130
Object equalObj = input.getField(conValIndex);
131131
if (equalObj == null) {
132132
resultFuture.complete(null);
133+
return;
133134
}
134135
basicDBObject.put(sideInfo.getEqualFieldList().get(i), equalObj);
135136
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
7575
Object equalObj = input.getField(conValIndex);
7676
if (equalObj == null) {
7777
resultFuture.complete(null);
78+
return;
7879
}
79-
8080
inputParams.add(equalObj);
8181
}
8282

@@ -89,12 +89,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
8989
dealMissKey(input, resultFuture);
9090
return;
9191
} else if (ECacheContentType.MultiLine == val.getType()) {
92-
92+
List<Row> rowList = Lists.newArrayList();
9393
for (Object jsonArray : (List) val.getContent()) {
9494
Row row = fillData(input, jsonArray);
95-
resultFuture.complete(Collections.singleton(row));
95+
rowList.add(row);
9696
}
97-
97+
resultFuture.complete(rowList);
9898
} else {
9999
throw new RuntimeException("not support cache obj type " + val.getType());
100100
}

0 commit comments

Comments
 (0)