Skip to content

Commit accef3f

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 9ca53f2 + c90aa53 commit accef3f

5 files changed

Lines changed: 31 additions & 17 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public void asyncInvoke(Row row, ResultFuture<BaseRow> resultFuture) throws Exce
194194
}
195195

196196
private Map<String, Object> parseInputParam(Row input) {
197-
Map<String, Object> inputParams = Maps.newHashMap();
197+
Map<String, Object> inputParams = Maps.newLinkedHashMap();
198198
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
199199
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
200200
Object equalObj = input.getField(conValIndex);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbstractRowKeyModeDealer.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ protected Row fillData(Row input, Object sideInput){
9494
Row row = new Row(outFieldInfoList.size());
9595
for(Map.Entry<Integer, Integer> entry : inFieldIndex.entrySet()){
9696
Object obj = input.getField(entry.getValue());
97-
if(obj instanceof Timestamp){
98-
obj = ((Timestamp)obj).getTime();
99-
}
10097
row.setField(entry.getKey(), obj);
10198
}
10299

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@
4242
import java.time.LocalDateTime;
4343
import java.util.ArrayList;
4444
import java.util.Calendar;
45+
import java.util.HashMap;
4546
import java.util.List;
4647
import java.util.Map;
48+
import java.util.Objects;
4749
import java.util.concurrent.atomic.AtomicReference;
4850
import java.util.stream.Collectors;
4951

@@ -103,7 +105,7 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
103105
List<Integer> equalValIndex = sideInfo.getEqualValIndex();
104106
ArrayList<Object> inputParams = equalValIndex.stream()
105107
.map(value::getField)
106-
.filter(object -> null != object)
108+
.filter(Objects::nonNull)
107109
.collect(Collectors.toCollection(ArrayList::new));
108110

109111
if (inputParams.size() != equalValIndex.size() && sideInfo.getJoinType() == JoinType.LEFT) {
@@ -121,7 +123,7 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
121123
Row row = fillData(value, null);
122124
RowDataComplete.collectRow(out, row);
123125
} else if (!CollectionUtils.isEmpty(cacheList)) {
124-
cacheList.stream().forEach(one -> out.collect(RowDataConvert.convertToBaseRow(fillData(value, one))));
126+
cacheList.forEach(one -> out.collect(RowDataConvert.convertToBaseRow(fillData(value, one))));
125127
}
126128
}
127129

@@ -166,13 +168,17 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
166168
ResultSet resultSet = statement.executeQuery(sql);
167169

168170
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
169-
String[] fields = sideInfo.getSideTableInfo().getFieldTypes();
171+
String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes();
172+
Map<String, String> sideFieldNamesAndTypes = Maps.newHashMap();
173+
for (int i = 0; i < sideFieldNames.length; i++) {
174+
sideFieldNamesAndTypes.put(sideFieldNames[i], sideFieldTypes[i]);
175+
}
176+
170177
while (resultSet.next()) {
171178
Map<String, Object> oneRow = Maps.newHashMap();
172179
for (String fieldName : sideFieldNames) {
173180
Object object = resultSet.getObject(fieldName.trim());
174-
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
175-
object = SwitchUtil.getTarget(object, fields[fieldIndex]);
181+
object = SwitchUtil.getTarget(object, sideFieldNamesAndTypes.get(fieldName));
176182
oneRow.put(fieldName.trim(), object);
177183
}
178184

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllSideInfo.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,24 @@
1818

1919
package com.dtstack.flink.sql.side.rdb.all;
2020

21-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22-
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.BaseSideInfo;
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.PredicateInfo;
26-
import com.dtstack.flink.sql.side.BaseSideInfo;
27-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2826
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2927
import com.dtstack.flink.sql.util.ParseUtils;
3028
import com.google.common.collect.Lists;
3129
import org.apache.calcite.sql.SqlNode;
3230
import org.apache.commons.collections.CollectionUtils;
3331
import org.apache.commons.lang3.StringUtils;
32+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3433
import org.slf4j.Logger;
3534
import org.slf4j.LoggerFactory;
3635

37-
import java.util.Arrays;
3836
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Objects;
3939
import java.util.stream.Collectors;
4040

4141
/**
@@ -59,7 +59,18 @@ public RdbAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
5959
@Override
6060
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
6161
RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo) sideTableInfo;
62-
sqlCondition = getSelectFromStatement(getTableName(rdbSideTableInfo), Arrays.asList(StringUtils.split(sideSelectFields, ",")), sideTableInfo.getPredicateInfoes());
62+
List<String> selectFields = Lists.newArrayList();
63+
Map<String, String> physicalFields = rdbSideTableInfo.getPhysicalFields();
64+
physicalFields.keySet().forEach(
65+
item -> {
66+
if (Objects.isNull(physicalFields.get(item))) {
67+
selectFields.add(quoteIdentifier(item));
68+
} else {
69+
selectFields.add(quoteIdentifier(physicalFields.get(item)) + " AS " + quoteIdentifier(item));
70+
}
71+
}
72+
);
73+
sqlCondition = getSelectFromStatement(getTableName(rdbSideTableInfo), selectFields, sideTableInfo.getPredicateInfoes());
6374
LOG.info("--------dimension sql query-------\n{}" + sqlCondition);
6475
}
6576

@@ -68,7 +79,7 @@ public String getAdditionalWhereClause() {
6879
}
6980

7081
private String getSelectFromStatement(String tableName, List<String> selectFields, List<PredicateInfo> predicateInfoes) {
71-
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
82+
String fromClause = String.join(", ", selectFields);
7283
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));
7384
String whereClause = buildWhereClause(predicateClause);
7485
return "SELECT " + fromClause + " FROM " + tableName + whereClause;

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ private void handleQuery(SQLConnection connection, Map<String, Object> inputPara
338338
}
339339

340340
private Map<String, Object> formatInputParam(Map<String, Object> inputParam) {
341-
Map<String, Object> result = Maps.newHashMap();
341+
Map<String, Object> result = Maps.newLinkedHashMap();
342342
inputParam.forEach((k, v) -> {
343343
result.put(k, convertDataType(v));
344344
});

0 commit comments

Comments
 (0)