Skip to content

Commit 2c1186c

Browse files
committed
all plugins row2baserow
1 parent 7480358 commit 2c1186c

16 files changed

Lines changed: 110 additions & 89 deletions

File tree

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,14 @@ protected void reloadCache() {
117117

118118

119119
@Override
120-
public void flatMap(Tuple2<Boolean, Row> input, Collector<Tuple2<Boolean, BaseRow>> out) throws Exception {
120+
public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
121121
List<Object> inputParams = Lists.newArrayList();
122122
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
123-
Object equalObj = input.f1.getField(conValIndex);
123+
Object equalObj = input.getField(conValIndex);
124124
if (equalObj == null) {
125125
if (sideInfo.getJoinType() == JoinType.LEFT) {
126-
Row row = fillData(input.f1, null);
127-
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
126+
Row row = fillData(input, null);
127+
RowDataComplete.collectRow(out, row);
128128
}
129129
return;
130130
}
@@ -136,8 +136,8 @@ public void flatMap(Tuple2<Boolean, Row> input, Collector<Tuple2<Boolean, BaseRo
136136
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
137137
if (CollectionUtils.isEmpty(cacheList)) {
138138
if (sideInfo.getJoinType() == JoinType.LEFT) {
139-
Row row = fillData(input.f1, null);
140-
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
139+
Row row = fillData(input, null);
140+
RowDataComplete.collectRow(out, row);
141141
} else {
142142
return;
143143
}
@@ -146,8 +146,8 @@ public void flatMap(Tuple2<Boolean, Row> input, Collector<Tuple2<Boolean, BaseRo
146146
}
147147

148148
for (Map<String, Object> one : cacheList) {
149-
Row row = fillData(input.f1, one);
150-
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
149+
Row row = fillData(input, one);
150+
RowDataComplete.collectRow(out, row);
151151
}
152152

153153
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
163163
}
164164

165165
@Override
166-
public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception {
166+
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
167167

168168
String key = buildCacheKey(inputParams);
169169
//connect Cassandra
@@ -194,15 +194,15 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
194194
cluster.closeAsync();
195195
if (rows.size() > 0) {
196196
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
197-
List<Tuple2<Boolean,Row>> rowList = Lists.newArrayList();
197+
List<Row> rowList = Lists.newArrayList();
198198
for (com.datastax.driver.core.Row line : rows) {
199-
Row row = fillData(input.f1, line);
199+
Row row = fillData(input, line);
200200
if (openCache()) {
201201
cacheContent.add(line);
202202
}
203-
rowList.add(Tuple2.of(input.f0,row));
203+
rowList.add(row);
204204
}
205-
RowDataComplete.completeTupleRows(resultFuture, rowList);
205+
RowDataComplete.completeRow(resultFuture, rowList);
206206
if (openCache()) {
207207
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
208208
}

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ public Elasticsearch6AllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<
7373
}
7474

7575
@Override
76-
public void flatMap(Tuple2<Boolean,Row> value, Collector<Tuple2<Boolean, BaseRow>> out) throws Exception {
76+
public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
7777
List<Object> inputParams = Lists.newArrayList();
7878
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
79-
Object equalObj = value.f1.getField(conValIndex);
79+
Object equalObj = value.getField(conValIndex);
8080
if (equalObj == null) {
8181
sendOutputRow(value, null, out);
8282
return;

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void open(Configuration parameters) throws Exception {
8383

8484

8585
@Override
86-
public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception {
86+
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
8787
String key = buildCacheKey(inputParams);
8888
BoolQueryBuilder boolQueryBuilder = Es6Util.setPredicateclause(sideInfo);
8989
boolQueryBuilder = setInputParams(inputParams, boolQueryBuilder);
@@ -99,7 +99,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Ro
9999
public void onResponse(SearchResponse searchResponse) {
100100

101101
List<Object> cacheContent = Lists.newArrayList();
102-
List<Tuple2<Boolean,Row>> rowList = Lists.newArrayList();
102+
List<Row> rowList = Lists.newArrayList();
103103
SearchHit[] searchHits = searchResponse.getHits().getHits();
104104
if (searchHits.length > 0) {
105105
Elasticsearch6SideTableInfo tableInfo = null;
@@ -123,7 +123,7 @@ public void onResponse(SearchResponse searchResponse) {
123123
searchHits = searchResponse.getHits().getHits();
124124
}
125125
dealCacheData(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
126-
RowDataComplete.completeTupleRows(resultFuture, rowList);
126+
RowDataComplete.completeRow(resultFuture, rowList);
127127
} catch (Exception e) {
128128
dealFillDataError(input, resultFuture, e);
129129
} finally {
@@ -161,7 +161,7 @@ public String buildCacheKey(Map<String, Object> inputParams) {
161161
return sb.toString();
162162
}
163163

164-
private void loadDataToCache(SearchHit[] searchHits, List<Tuple2<Boolean,Row>> rowList, List<Object> cacheContent, Tuple2<Boolean,Row> copyCrow) {
164+
private void loadDataToCache(SearchHit[] searchHits, List<Row> rowList, List<Object> cacheContent, Row copyCrow) {
165165
List<Object> results = Lists.newArrayList();
166166
for (SearchHit searchHit : searchHits) {
167167
Map<String, Object> object = searchHit.getSourceAsMap();
@@ -170,14 +170,14 @@ private void loadDataToCache(SearchHit[] searchHits, List<Tuple2<Boolean,Row>> r
170170
rowList.addAll(getRows(copyCrow, cacheContent, results));
171171
}
172172

173-
protected List<Tuple2<Boolean, Row>> getRows(Tuple2<Boolean, Row> inputRow, List<Object> cacheContent, List<Object> results) {
174-
List<Tuple2<Boolean, Row>> rowList = Lists.newArrayList();
173+
protected List<Row> getRows(Row inputRow, List<Object> cacheContent, List<Object> results) {
174+
List<Row> rowList = Lists.newArrayList();
175175
for (Object line : results) {
176-
Row row = fillData(inputRow.f1, line);
176+
Row row = fillData(inputRow, line);
177177
if (null != cacheContent && openCache()) {
178178
cacheContent.add(line);
179179
}
180-
rowList.add(Tuple2.of(inputRow.f0, row));
180+
rowList.add(row);
181181
}
182182
return rowList;
183183
}

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,15 @@ protected void reloadCache() {
140140
}
141141

142142
@Override
143-
public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean, BaseRow>> out) throws Exception {
143+
public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
144144
Map<String, Object> refData = Maps.newHashMap();
145145
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
146146
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
147-
Object equalObj = input.f1.getField(conValIndex);
147+
Object equalObj = input.getField(conValIndex);
148148
if (equalObj == null) {
149149
if (sideInfo.getJoinType() == JoinType.LEFT) {
150-
Row data = fillData(input.f1, null);
151-
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, data));
150+
Row data = fillData(input, null);
151+
RowDataComplete.collectRow(out, data);
152152
}
153153
return;
154154
}
@@ -165,14 +165,14 @@ public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean, BaseRow
165165
for (Map.Entry<String, Map<String, Object>> entry : cacheRef.get().entrySet()) {
166166
if (entry.getKey().startsWith(rowKeyStr)) {
167167
cacheList = cacheRef.get().get(entry.getKey());
168-
Row row = fillData(input.f1, cacheList);
169-
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
168+
Row row = fillData(input, cacheList);
169+
RowDataComplete.collectRow(out, row);
170170
}
171171
}
172172
} else {
173173
cacheList = cacheRef.get().get(rowKeyStr);
174-
Row row = fillData(input.f1, cacheList);
175-
RowDataComplete.collectTupleRow(out, new Tuple2<>(input.f0, row));
174+
Row row = fillData(input, cacheList);
175+
RowDataComplete.collectRow(out, row);
176176
}
177177

178178
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void open(Configuration parameters) throws Exception {
146146
}
147147

148148
@Override
149-
public void handleAsyncInvoke(Map<String, Object> inputParams, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture) throws Exception {
149+
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
150150
rowKeyMode.asyncGetData(tableName, buildCacheKey(inputParams), input, resultFuture, sideInfo.getSideCache());
151151
}
152152

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbstractRowKeyModeDealer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,16 @@ public AbstractRowKeyModeDealer(Map<String, String> colRefType, String[] colName
7575
this.sideFieldIndex = sideFieldIndex;
7676
}
7777

78-
protected void dealMissKey(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture){
79-
if(joinType == JoinType.LEFT){
78+
protected void dealMissKey(Row input, ResultFuture<BaseRow> resultFuture) {
79+
if (joinType == JoinType.LEFT) {
8080
try {
8181
//保留left 表数据
82-
Row row = fillData(input.f1, null);
83-
RowDataComplete.completeTupleRows(resultFuture, Collections.singleton(Tuple2.of(input.f0,row)));
82+
Row row = fillData(input, null);
83+
RowDataComplete.completeRow(resultFuture, row);
8484
} catch (Exception e) {
8585
resultFuture.completeExceptionally(e);
8686
}
87-
}else{
87+
} else {
8888
resultFuture.complete(null);
8989
}
9090
}
@@ -112,6 +112,6 @@ protected Row fillData(Row input, Object sideInput){
112112
return row;
113113
}
114114

115-
public abstract void asyncGetData(String tableName, String rowKeyStr, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture,
115+
public abstract void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFuture<BaseRow> resultFuture,
116116
AbstractSideCache sideCache);
117117
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public PreRowKeyModeDealerDealer(Map<String, String> colRefType, String[] colNam
6767
}
6868

6969
@Override
70-
public void asyncGetData(String tableName, String rowKeyStr, Tuple2<Boolean, Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture,
70+
public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFuture<BaseRow> resultFuture,
7171
AbstractSideCache sideCache) {
7272
Scanner prefixScanner = hBaseClient.newScanner(tableName);
7373
ScanFilter scanFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.UTF8(rowKeyStr)));
@@ -81,8 +81,8 @@ public void asyncGetData(String tableName, String rowKeyStr, Tuple2<Boolean, Row
8181
}
8282

8383

84-
private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr, Tuple2<Boolean,Row> input,
85-
ResultFuture<Tuple2<Boolean,BaseRow> > resultFuture, AbstractSideCache sideCache) {
84+
private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr, Row input,
85+
ResultFuture<BaseRow> resultFuture, AbstractSideCache sideCache) {
8686
if(args == null || args.size() == 0){
8787
dealMissKey(input, resultFuture);
8888
if (openCache) {
@@ -91,7 +91,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
9191
}
9292

9393
List<Object> cacheContent = Lists.newArrayList();
94-
List<Tuple2<Boolean,Row> > rowList = Lists.newArrayList();
94+
List<Row> rowList = Lists.newArrayList();
9595

9696
for(List<KeyValue> oneRow : args){
9797
try {
@@ -119,12 +119,12 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
119119
sideVal.add(val);
120120
}
121121

122-
Row row = fillData(input.f1, sideVal);
122+
Row row = fillData(input, sideVal);
123123
if (openCache) {
124124
cacheContent.add(sideVal);
125125
}
126126

127-
rowList.add(Tuple2.of(input.f0,row));
127+
rowList.add(row);
128128
}
129129
}catch (Exception e) {
130130
resultFuture.completeExceptionally(e);
@@ -137,7 +137,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
137137
}
138138

139139
if (rowList.size() > 0){
140-
RowDataComplete.completeTupleRows(resultFuture, rowList);
140+
RowDataComplete.completeRow(resultFuture, rowList);
141141
}
142142

143143
if(openCache){
@@ -147,7 +147,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
147147
return "";
148148
}
149149

150-
private String dealFail(Object arg2, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,BaseRow>> resultFuture){
150+
private String dealFail(Object arg2, Row input, ResultFuture<BaseRow> resultFuture){
151151
LOG.error("record:" + input);
152152
LOG.error("get side record exception:" + arg2);
153153
resultFuture.complete(null);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public RowKeyEqualModeDealer(Map<String, String> colRefType, String[] colNames,
6363

6464

6565
@Override
66-
public void asyncGetData(String tableName, String rowKeyStr, Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean, BaseRow>> resultFuture,
66+
public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFuture<BaseRow> resultFuture,
6767
AbstractSideCache sideCache){
6868
//TODO 是否有查询多个col family 和多个col的方法
6969
GetRequest getRequest = new GetRequest(tableName, rowKeyStr);
@@ -94,11 +94,11 @@ public void asyncGetData(String tableName, String rowKeyStr, Tuple2<Boolean,Row>
9494
sideVal.add(val);
9595
}
9696

97-
Row row = fillData(input.f1, sideVal);
97+
Row row = fillData(input, sideVal);
9898
if(openCache){
9999
sideCache.putCache(rowKeyStr, CacheObj.buildCacheObj(ECacheContentType.SingleLine, sideVal));
100100
}
101-
RowDataComplete.completeTupleRows(resultFuture, Collections.singleton(Tuple2.of(input.f0, row)));
101+
RowDataComplete.completeRow(resultFuture, row);
102102
} catch (Exception e) {
103103
resultFuture.completeExceptionally(e);
104104
}

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ protected void reloadCache() {
107107

108108

109109
@Override
110-
public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean, BaseRow>> out) throws Exception {
110+
public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
111111
List<Object> inputParams = Lists.newArrayList();
112112
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
113-
Object equalObj = input.f1.getField(conValIndex);
113+
Object equalObj = input.getField(conValIndex);
114114
if (equalObj == null) {
115115
out.collect(null);
116116
}
@@ -121,15 +121,15 @@ public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean, BaseRow
121121
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
122122
if (CollectionUtils.isEmpty(cacheList)) {
123123
if (sideInfo.getJoinType() == JoinType.LEFT) {
124-
Row row = fillData(input.f1, null);
125-
RowDataComplete.collectTupleRow(out, Tuple2.of(input.f0, row));
124+
Row row = fillData(input, null);
125+
RowDataComplete.collectRow(out, row);
126126
}
127127
return;
128128
}
129129

130130
for (Map<String, Object> one : cacheList) {
131-
Row row = fillData(input.f1, one);
132-
RowDataComplete.collectTupleRow(out, Tuple2.of(input.f0, row));
131+
Row row = fillData(input, one);
132+
RowDataComplete.collectRow(out, row);
133133
}
134134
}
135135

0 commit comments

Comments
 (0)