Skip to content

Commit 7caa315

Browse files
committed
fix bug 26199
1 parent ae796a0 commit 7caa315

21 files changed

Lines changed: 179 additions & 113 deletions

File tree

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,30 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import org.apache.flink.api.java.tuple.Tuple2;
22-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23-
import org.apache.flink.types.Row;
24-
import org.apache.flink.util.Collector;
25-
26-
import com.datastax.driver.core.Cluster;
27-
import com.datastax.driver.core.ConsistencyLevel;
28-
import com.datastax.driver.core.HostDistance;
29-
import com.datastax.driver.core.PoolingOptions;
30-
import com.datastax.driver.core.QueryOptions;
31-
import com.datastax.driver.core.ResultSet;
32-
import com.datastax.driver.core.Session;
33-
import com.datastax.driver.core.SocketOptions;
21+
import com.datastax.driver.core.*;
3422
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3523
import com.datastax.driver.core.policies.RetryPolicy;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3625
import com.dtstack.flink.sql.side.BaseAllReqRow;
3726
import com.dtstack.flink.sql.side.FieldInfo;
3827
import com.dtstack.flink.sql.side.JoinInfo;
39-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
4028
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
29+
import com.dtstack.flink.sql.util.RowDataComplete;
4130
import com.google.common.collect.Lists;
4231
import com.google.common.collect.Maps;
4332
import org.apache.calcite.sql.JoinType;
4433
import org.apache.commons.collections.CollectionUtils;
4534
import org.apache.commons.lang3.StringUtils;
35+
import org.apache.flink.api.java.tuple.Tuple2;
36+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
37+
import org.apache.flink.table.dataformat.BaseRow;
38+
import org.apache.flink.types.Row;
39+
import org.apache.flink.util.Collector;
4640
import org.slf4j.Logger;
4741
import org.slf4j.LoggerFactory;
4842

4943
import java.net.InetAddress;
5044
import java.sql.SQLException;
51-
import java.sql.Timestamp;
5245
import java.util.ArrayList;
5346
import java.util.Calendar;
5447
import java.util.List;
@@ -124,14 +117,14 @@ protected void reloadCache() {
124117

125118

126119
@Override
127-
public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean,Row>> out) throws Exception {
120+
public void flatMap(Tuple2<Boolean, Row> input, Collector<Tuple2<Boolean, BaseRow>> out) throws Exception {
128121
List<Object> inputParams = Lists.newArrayList();
129122
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
130123
Object equalObj = input.f1.getField(conValIndex);
131124
if (equalObj == null) {
132125
if (sideInfo.getJoinType() == JoinType.LEFT) {
133-
Row data = fillData(input.f1, null);
134-
out.collect(Tuple2.of(input.f0, data));
126+
Row row = fillData(input.f1, null);
127+
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
135128
}
136129
return;
137130
}
@@ -144,7 +137,7 @@ public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean,Row>> ou
144137
if (CollectionUtils.isEmpty(cacheList)) {
145138
if (sideInfo.getJoinType() == JoinType.LEFT) {
146139
Row row = fillData(input.f1, null);
147-
out.collect(Tuple2.of(input.f0, row));
140+
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
148141
} else {
149142
return;
150143
}
@@ -153,7 +146,8 @@ public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean,Row>> ou
153146
}
154147

155148
for (Map<String, Object> one : cacheList) {
156-
out.collect(Tuple2.of(input.f0, fillData(input.f1, one)));
149+
Row row = fillData(input.f1, one);
150+
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
157151
}
158152

159153
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import com.dtstack.flink.sql.util.RowDataComplete;
2223
import org.apache.flink.api.java.tuple.Tuple2;
2324
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2425
import org.apache.flink.configuration.Configuration;
2526
import org.apache.flink.streaming.api.functions.async.ResultFuture;
27+
import org.apache.flink.table.dataformat.BaseRow;
2628
import org.apache.flink.types.Row;
2729

2830
import com.datastax.driver.core.Cluster;
@@ -161,7 +163,7 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
161163
}
162164

163165
@Override
164-
public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception {
166+
public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception {
165167

166168
String key = buildCacheKey(inputParams);
167169
//connect Cassandra
@@ -200,7 +202,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
200202
}
201203
rowList.add(Tuple2.of(input.f0,row));
202204
}
203-
resultFuture.complete(rowList);
205+
RowDataComplete.completeTupleRows(resultFuture, rowList);
204206
if (openCache()) {
205207
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
206208
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
*/
1818

1919

20-
2120
package com.dtstack.flink.sql.side;
2221

22+
import com.dtstack.flink.sql.util.RowDataComplete;
2323
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2424
import org.apache.flink.api.java.tuple.Tuple2;
2525
import org.apache.flink.configuration.Configuration;
@@ -44,10 +44,11 @@
4444
* Reason:
4545
* Date: 2018/9/18
4646
* Company: www.dtstack.com
47+
*
4748
* @author xuchao
4849
*/
4950

50-
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,BaseRow>> implements ISideReqRow {
51+
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean, Row>, Tuple2<Boolean, BaseRow>> implements ISideReqRow {
5152

5253
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
5354

@@ -57,7 +58,7 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,R
5758

5859
private ScheduledExecutorService es;
5960

60-
public BaseAllReqRow(BaseSideInfo sideInfo){
61+
public BaseAllReqRow(BaseSideInfo sideInfo) {
6162
this.sideInfo = sideInfo;
6263

6364
}
@@ -74,7 +75,7 @@ public void open(Configuration parameters) throws Exception {
7475

7576
//start reload cache thread
7677
AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
77-
es = new ScheduledThreadPoolExecutor(1,new DTThreadFactory("cache-all-reload"));
78+
es = new ScheduledThreadPoolExecutor(1, new DTThreadFactory("cache-all-reload"));
7879
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
7980
}
8081

@@ -88,12 +89,12 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
8889
return obj;
8990
}
9091

91-
protected void sendOutputRow(Tuple2<Boolean, Row> value, Object sideInput, Collector<Tuple2<Boolean, Row>> out) {
92+
protected void sendOutputRow(Tuple2<Boolean, Row> value, Object sideInput, Collector<Tuple2<Boolean, BaseRow>> out) {
9293
if (sideInput == null && sideInfo.getJoinType() != JoinType.LEFT) {
9394
return;
9495
}
9596
Row row = fillData(value.f1, sideInput);
96-
out.collect(Tuple2.of(value.f0, row));
97+
RowDataComplete.collectTupleRow(out, Tuple2.of(value.f0, row));
9798
}
9899

99100
@Override

0 commit comments

Comments
 (0)