Skip to content

Commit 7610562

Browse files
committed
Merge branch 'feat_1.10_replaceCompleteNull' into 'v1.10.0_dev'
[feat] replace complete(null) 为 complete(Collections.EMPTY_LIST); See merge request dt-insight-engine/flinkStreamSQL!78
2 parents 363adca + fd40c57 commit 7610562

6 files changed

Lines changed: 9 additions & 9 deletions

File tree

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
211211
if (openCache()) {
212212
putCache(key, CacheMissVal.getMissKeyObj());
213213
}
214-
resultFuture.complete(null);
214+
resultFuture.complete(Collections.EMPTY_LIST);
215215
}
216216
}
217217

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ protected void dealMissKey(Row input, ResultFuture<BaseRow> resultFuture) {
145145
dealFillDataError(input, resultFuture, e);
146146
}
147147
} else {
148-
resultFuture.complete(null);
148+
resultFuture.complete(Collections.EMPTY_LIST);
149149
}
150150
}
151151

@@ -163,14 +163,14 @@ public void timeout(Row input, ResultFuture<BaseRow> resultFuture) throws Except
163163
}
164164
timeOutNum++;
165165
if (sideInfo.getJoinType() == JoinType.LEFT) {
166-
resultFuture.complete(null);
166+
resultFuture.complete(Collections.EMPTY_LIST);
167167
return;
168168
}
169169
if (timeOutNum > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)) {
170170
resultFuture.completeExceptionally(new Exception("Async function call timedoutNum beyond limit."));
171171
return;
172172
}
173-
resultFuture.complete(null);
173+
resultFuture.complete(Collections.EMPTY_LIST);
174174
}
175175

176176
protected void preInvoke(Row input, ResultFuture<BaseRow> resultFuture)

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbstractRowKeyModeDealer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ protected void dealMissKey(Row input, ResultFuture<BaseRow> resultFuture) {
8585
resultFuture.completeExceptionally(e);
8686
}
8787
} else {
88-
resultFuture.complete(null);
88+
resultFuture.complete(Collections.EMPTY_LIST);
8989
}
9090
}
9191

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
130130
resultFuture.completeExceptionally(e);
131131
}
132132
} catch (Exception e) {
133-
resultFuture.complete(null);
133+
resultFuture.complete(Collections.EMPTY_LIST);
134134
LOG.error("record:" + input);
135135
LOG.error("get side record exception:", e);
136136
}
@@ -150,7 +150,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
150150
private String dealFail(Object arg2, Row input, ResultFuture<BaseRow> resultFuture){
151151
LOG.error("record:" + input);
152152
LOG.error("get side record exception:" + arg2);
153-
resultFuture.complete(null);
153+
resultFuture.complete(Collections.EMPTY_LIST);
154154
return "";
155155
}
156156
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
118118
}, arg2 -> {
119119
LOG.error("record:" + input);
120120
LOG.error("get side record exception:" + arg2);
121-
resultFuture.complete(null);
121+
resultFuture.complete(Collections.EMPTY_LIST);
122122
return "";
123123
});
124124
}

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void apply(final Document document) {
133133
public void onResult(final Void result, final Throwable t) {
134134
if (atomicInteger.get() <= 0) {
135135
LOG.warn("Cannot retrieve the data from the database");
136-
resultFuture.complete(null);
136+
resultFuture.complete(Collections.EMPTY_LIST);
137137
} else {
138138
if (openCache()) {
139139
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));

0 commit comments

Comments
 (0)