Skip to content

Commit 999109b

Browse files
committed
Merge remote-tracking branch 'origin/hotfix_1.10_26199' into v1.10.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
2 parents 7a3e1de + 5f01dd5 commit 999109b

26 files changed

Lines changed: 438 additions & 194 deletions

File tree

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,30 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import org.apache.flink.api.java.tuple.Tuple2;
22-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23-
import org.apache.flink.types.Row;
24-
import org.apache.flink.util.Collector;
25-
26-
import com.datastax.driver.core.Cluster;
27-
import com.datastax.driver.core.ConsistencyLevel;
28-
import com.datastax.driver.core.HostDistance;
29-
import com.datastax.driver.core.PoolingOptions;
30-
import com.datastax.driver.core.QueryOptions;
31-
import com.datastax.driver.core.ResultSet;
32-
import com.datastax.driver.core.Session;
33-
import com.datastax.driver.core.SocketOptions;
21+
import com.datastax.driver.core.*;
3422
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3523
import com.datastax.driver.core.policies.RetryPolicy;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3625
import com.dtstack.flink.sql.side.BaseAllReqRow;
3726
import com.dtstack.flink.sql.side.FieldInfo;
3827
import com.dtstack.flink.sql.side.JoinInfo;
39-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
4028
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
29+
import com.dtstack.flink.sql.util.RowDataComplete;
4130
import com.google.common.collect.Lists;
4231
import com.google.common.collect.Maps;
4332
import org.apache.calcite.sql.JoinType;
4433
import org.apache.commons.collections.CollectionUtils;
4534
import org.apache.commons.lang3.StringUtils;
35+
import org.apache.flink.api.java.tuple.Tuple2;
36+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
37+
import org.apache.flink.table.dataformat.BaseRow;
38+
import org.apache.flink.types.Row;
39+
import org.apache.flink.util.Collector;
4640
import org.slf4j.Logger;
4741
import org.slf4j.LoggerFactory;
4842

4943
import java.net.InetAddress;
5044
import java.sql.SQLException;
51-
import java.sql.Timestamp;
5245
import java.util.ArrayList;
5346
import java.util.Calendar;
5447
import java.util.List;
@@ -124,14 +117,14 @@ protected void reloadCache() {
124117

125118

126119
@Override
127-
public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean,Row>> out) throws Exception {
120+
public void flatMap(Tuple2<Boolean, Row> input, Collector<Tuple2<Boolean, BaseRow>> out) throws Exception {
128121
List<Object> inputParams = Lists.newArrayList();
129122
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
130123
Object equalObj = input.f1.getField(conValIndex);
131124
if (equalObj == null) {
132125
if (sideInfo.getJoinType() == JoinType.LEFT) {
133-
Row data = fillData(input.f1, null);
134-
out.collect(Tuple2.of(input.f0, data));
126+
Row row = fillData(input.f1, null);
127+
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
135128
}
136129
return;
137130
}
@@ -144,7 +137,7 @@ public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean,Row>> ou
144137
if (CollectionUtils.isEmpty(cacheList)) {
145138
if (sideInfo.getJoinType() == JoinType.LEFT) {
146139
Row row = fillData(input.f1, null);
147-
out.collect(Tuple2.of(input.f0, row));
140+
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
148141
} else {
149142
return;
150143
}
@@ -153,7 +146,8 @@ public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean,Row>> ou
153146
}
154147

155148
for (Map<String, Object> one : cacheList) {
156-
out.collect(Tuple2.of(input.f0, fillData(input.f1, one)));
149+
Row row = fillData(input.f1, one);
150+
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
157151
}
158152

159153
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import com.dtstack.flink.sql.util.RowDataComplete;
2223
import org.apache.flink.api.java.tuple.Tuple2;
2324
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2425
import org.apache.flink.configuration.Configuration;
2526
import org.apache.flink.streaming.api.functions.async.ResultFuture;
27+
import org.apache.flink.table.dataformat.BaseRow;
2628
import org.apache.flink.types.Row;
2729

2830
import com.datastax.driver.core.Cluster;
@@ -161,7 +163,7 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
161163
}
162164

163165
@Override
164-
public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception {
166+
public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception {
165167

166168
String key = buildCacheKey(inputParams);
167169
//connect Cassandra
@@ -200,7 +202,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
200202
}
201203
rowList.add(Tuple2.of(input.f0,row));
202204
}
203-
resultFuture.complete(rowList);
205+
RowDataComplete.completeTupleRows(resultFuture, rowList);
204206
if (openCache()) {
205207
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
206208
}

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,18 @@
2424
import com.google.common.collect.Lists;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
28+
import org.apache.flink.table.types.DataType;
29+
import org.apache.flink.table.types.logical.DecimalType;
30+
import org.apache.flink.table.types.logical.LogicalType;
31+
import org.apache.flink.table.types.utils.ClassDataTypeConverter;
32+
import org.apache.flink.table.types.utils.TypeConversions;
2733

2834
import java.io.Serializable;
35+
import java.math.BigDecimal;
2936
import java.util.List;
3037
import java.util.Objects;
38+
import java.util.Optional;
3139

3240
/**
3341
* Reason:
@@ -96,6 +104,27 @@ public RowTypeInfo getRowTypeInfo(){
96104
return new RowTypeInfo(types, fieldNames);
97105
}
98106

107+
public BaseRowTypeInfo getBaseRowTypeInfo(){
108+
String[] fieldNames = getFields();
109+
Class[] fieldClass = getFieldClasses();
110+
LogicalType[] logicalTypes = new LogicalType[fieldClass.length];
111+
for (int i = 0; i < fieldClass.length; i++) {
112+
if(fieldClass[i].getName().equals(BigDecimal.class.getName())){
113+
logicalTypes[i] = new DecimalType(DecimalType.MAX_PRECISION, 18);
114+
continue;
115+
}
116+
117+
Optional<DataType> optionalDataType = ClassDataTypeConverter.extractDataType(fieldClass[i]);
118+
if(!optionalDataType.isPresent()){
119+
throw new RuntimeException(String.format("not support table % field %s type %s", getName(), fieldNames[i], fieldClass[i]));
120+
}
121+
122+
logicalTypes[i] = optionalDataType.get().getLogicalType();
123+
}
124+
125+
return new BaseRowTypeInfo(logicalTypes, fieldNames);
126+
}
127+
99128
public String getCacheType() {
100129
return cacheType;
101130
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
*/
1818

1919

20-
2120
package com.dtstack.flink.sql.side;
2221

22+
import com.dtstack.flink.sql.util.RowDataComplete;
2323
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2424
import org.apache.flink.api.java.tuple.Tuple2;
2525
import org.apache.flink.configuration.Configuration;
26+
import org.apache.flink.table.dataformat.BaseRow;
2627
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2728
import org.apache.flink.types.Row;
2829
import org.apache.flink.util.Collector;
@@ -43,10 +44,11 @@
4344
* Reason:
4445
* Date: 2018/9/18
4546
* Company: www.dtstack.com
47+
*
4648
* @author xuchao
4749
*/
4850

49-
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,Row>> implements ISideReqRow {
51+
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean, Row>, Tuple2<Boolean, BaseRow>> implements ISideReqRow {
5052

5153
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
5254

@@ -56,7 +58,7 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,R
5658

5759
private ScheduledExecutorService es;
5860

59-
public BaseAllReqRow(BaseSideInfo sideInfo){
61+
public BaseAllReqRow(BaseSideInfo sideInfo) {
6062
this.sideInfo = sideInfo;
6163

6264
}
@@ -73,7 +75,7 @@ public void open(Configuration parameters) throws Exception {
7375

7476
//start reload cache thread
7577
AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
76-
es = new ScheduledThreadPoolExecutor(1,new DTThreadFactory("cache-all-reload"));
78+
es = new ScheduledThreadPoolExecutor(1, new DTThreadFactory("cache-all-reload"));
7779
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
7880
}
7981

@@ -87,12 +89,12 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
8789
return obj;
8890
}
8991

90-
protected void sendOutputRow(Tuple2<Boolean, Row> value, Object sideInput, Collector<Tuple2<Boolean, Row>> out) {
92+
protected void sendOutputRow(Tuple2<Boolean, Row> value, Object sideInput, Collector<Tuple2<Boolean, BaseRow>> out) {
9193
if (sideInput == null && sideInfo.getJoinType() != JoinType.LEFT) {
9294
return;
9395
}
9496
Row row = fillData(value.f1, sideInput);
95-
out.collect(Tuple2.of(value.f0, row));
97+
RowDataComplete.collectTupleRow(out, Tuple2.of(value.f0, row));
9698
}
9799

98100
@Override

0 commit comments

Comments
 (0)