Skip to content

Commit 508050c

Browse files
committed
modify reqRow to a interface
1 parent 1e9b14d commit 508050c

10 files changed

Lines changed: 122 additions & 58 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* @author xuchao
3838
*/
3939

40-
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row>{
40+
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row> implements ISideReqRow {
4141

4242
protected SideInfo sideInfo;
4343

@@ -48,8 +48,6 @@ public AllReqRow(SideInfo sideInfo){
4848

4949
}
5050

51-
protected abstract Row fillData(Row input, Object sideInput);
52-
5351
protected abstract void initCache() throws SQLException;
5452

5553
protected abstract void reloadCache();

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
* @author xuchao
4141
*/
4242

43-
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> {
43+
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
4444

4545
private static final long serialVersionUID = 2098635244857937717L;
4646

@@ -79,9 +79,6 @@ protected boolean openCache(){
7979
return sideInfo.getSideCache() != null;
8080
}
8181

82-
83-
protected abstract Row fillData(Row input, Object sideInput);
84-
8582
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
8683
if(sideInfo.getJoinType() == JoinType.LEFT){
8784
//Reserved left table data
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side;
20+
21+
import org.apache.flink.types.Row;
22+
23+
/**
24+
*
25+
* Date: 2018/12/4
26+
* Company: www.dtstack.com
27+
* @author xuchao
28+
*/
29+
public interface ISideReqRow {
30+
31+
Row fillData(Row input, Object sideInput);
32+
33+
}

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public HbaseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
6666
}
6767

6868
@Override
69-
protected Row fillData(Row input, Object sideInput) {
69+
public Row fillData(Row input, Object sideInput) {
7070
Map<String, Object> sideInputList = (Map<String, Object>) sideInput;
7171
Row row = new Row(sideInfo.getOutFieldInfoList().size());
7272
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
160160
}
161161

162162
@Override
163-
protected Row fillData(Row input, Object sideInput){
163+
public Row fillData(Row input, Object sideInput){
164164

165165
List<Object> sideInputList = (List<Object>) sideInput;
166166
Row row = new Row(sideInfo.getOutFieldInfoList().size());

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public MongoAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
8080
}
8181

8282
@Override
83-
protected Row fillData(Row input, Object sideInput) {
83+
public Row fillData(Row input, Object sideInput) {
8484
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
8585
Row row = new Row(sideInfo.getOutFieldInfoList().size());
8686
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public RdbAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo>
6767
}
6868

6969
@Override
70-
protected Row fillData(Row input, Object sideInput) {
70+
public Row fillData(Row input, Object sideInput) {
7171
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
7272
Row row = new Row(sideInfo.getOutFieldInfoList().size());
7373
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {

redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.side.redis;
2020

2121
import com.dtstack.flink.sql.side.*;
22+
import com.dtstack.flink.sql.side.redis.table.RedisSideReqRow;
2223
import com.dtstack.flink.sql.side.redis.table.RedisSideTableInfo;
2324
import org.apache.calcite.sql.JoinType;
2425
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@@ -55,34 +56,16 @@ public class RedisAllReqRow extends AllReqRow{
5556

5657
private AtomicReference<Map<String, Map<String, String>>> cacheRef = new AtomicReference<>();
5758

59+
private RedisSideReqRow redisSideReqRow;
60+
5861
public RedisAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
5962
super(new RedisAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
63+
this.redisSideReqRow = new RedisSideReqRow(super.sideInfo);
6064
}
6165

6266
@Override
63-
protected Row fillData(Row input, Object sideInput) {
64-
Map<String, String> sideInputMap = (Map<String, String>) sideInput;
65-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
66-
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
67-
Object obj = input.getField(entry.getValue());
68-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
69-
70-
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
71-
obj = ((Timestamp)obj).getTime();
72-
}
73-
row.setField(entry.getKey(), obj);
74-
}
75-
76-
for(Map.Entry<Integer, Integer> entry : sideInfo.getSideFieldIndex().entrySet()){
77-
if(sideInputMap == null){
78-
row.setField(entry.getKey(), null);
79-
}else{
80-
String key = sideInfo.getSideFieldNameIndex().get(entry.getKey());
81-
row.setField(entry.getKey(), sideInputMap.get(key));
82-
}
83-
}
84-
85-
return row;
67+
public Row fillData(Row input, Object sideInput) {
68+
return redisSideReqRow.fillData(input, sideInput);
8669
}
8770

8871
@Override

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.enums.ECacheContentType;
2222
import com.dtstack.flink.sql.side.*;
2323
import com.dtstack.flink.sql.side.cache.CacheObj;
24+
import com.dtstack.flink.sql.side.redis.table.RedisSideReqRow;
2425
import com.dtstack.flink.sql.side.redis.table.RedisSideTableInfo;
2526
import io.lettuce.core.KeyValue;
2627
import io.lettuce.core.RedisClient;
@@ -60,9 +61,11 @@ public class RedisAsyncReqRow extends AsyncReqRow {
6061

6162
private RedisSideTableInfo redisSideTableInfo;
6263

64+
private RedisSideReqRow redisSideReqRow;
6365

6466
public RedisAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
6567
super(new RedisAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
68+
redisSideReqRow = new RedisSideReqRow(super.sideInfo);
6669
}
6770

6871
@Override
@@ -108,30 +111,8 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){
108111
}
109112

110113
@Override
111-
protected Row fillData(Row input, Object sideInput) {
112-
Map<String, String> keyValue = (Map<String, String>) sideInput;
113-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
114-
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
115-
Object obj = input.getField(entry.getValue());
116-
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
117-
118-
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
119-
obj = ((Timestamp)obj).getTime();
120-
}
121-
122-
row.setField(entry.getKey(), obj);
123-
}
124-
125-
for(Map.Entry<Integer, Integer> entry : sideInfo.getSideFieldIndex().entrySet()){
126-
if(keyValue == null){
127-
row.setField(entry.getKey(), null);
128-
}else{
129-
String key = sideInfo.getSideFieldNameIndex().get(entry.getKey());
130-
row.setField(entry.getKey(), keyValue.get(key));
131-
}
132-
}
133-
134-
return row;
114+
public Row fillData(Row input, Object sideInput) {
115+
return redisSideReqRow.fillData(input, sideInput);
135116
}
136117

137118
@Override
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.redis.table;
20+
21+
import com.dtstack.flink.sql.side.ISideReqRow;
22+
import com.dtstack.flink.sql.side.SideInfo;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
26+
import java.io.Serializable;
27+
import java.sql.Timestamp;
28+
import java.util.Map;
29+
30+
/**
31+
* redis fill row data
32+
* Date: 2018/12/4
33+
* Company: www.dtstack.com
34+
* @author xuchao
35+
*/
36+
37+
public class RedisSideReqRow implements ISideReqRow, Serializable {
38+
39+
private static final long serialVersionUID = 3751171828444748982L;
40+
41+
private SideInfo sideInfo;
42+
43+
public RedisSideReqRow(SideInfo sideInfo){
44+
this.sideInfo = sideInfo;
45+
}
46+
47+
@Override
48+
public Row fillData(Row input, Object sideInput) {
49+
Map<String, String> sideInputMap = (Map<String, String>) sideInput;
50+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
51+
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
52+
Object obj = input.getField(entry.getValue());
53+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
54+
55+
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
56+
obj = ((Timestamp)obj).getTime();
57+
}
58+
row.setField(entry.getKey(), obj);
59+
}
60+
61+
for(Map.Entry<Integer, Integer> entry : sideInfo.getSideFieldIndex().entrySet()){
62+
if(sideInputMap == null){
63+
row.setField(entry.getKey(), null);
64+
}else{
65+
String key = sideInfo.getSideFieldNameIndex().get(entry.getKey());
66+
row.setField(entry.getKey(), sideInputMap.get(key));
67+
}
68+
}
69+
70+
return row;
71+
}
72+
}

0 commit comments

Comments
 (0)