Skip to content

Commit ea8cee4

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.5.0_dev
2 parents 70b1c77 + 84a44b2 commit ea8cee4

7 files changed

Lines changed: 20 additions & 15 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
109109
}
110110

111111
public static void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
112-
String primaryFields = matcher.group(1);
112+
String primaryFields = matcher.group(1).trim();
113113
String[] splitArry = primaryFields.split(",");
114114
List<String> primaryKes = Lists.newArrayList(splitArry);
115115
tableInfo.setPrimaryKeys(primaryKes);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLExcep
170170
StringBuilder key = new StringBuilder();
171171
key.append(family).append(":").append(qualifier);
172172

173-
kv.put(aliasNameInversion.get(key.toString().toUpperCase()), value);
173+
kv.put(aliasNameInversion.get(key.toString()), value);
174174
}
175175
tmpCache.put(new String(r.getRow()), kv);
176176
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
9898
String mapKey = cf + ":" + col;
9999

100100
//The table format defined using different data type conversion byte
101-
String colType = colRefType.get(mapKey.toUpperCase());
101+
String colType = colRefType.get(mapKey);
102102
Object val = HbaseUtils.convertByte(keyValue.value(), colType);
103-
sideMap.put(mapKey.toUpperCase(), val);
103+
sideMap.put(mapKey, val);
104104
}
105105

106106
if (oneRow.size() > 0) {

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
7373
String col = new String(keyValue.qualifier());
7474
String mapKey = cf + ":" + col;
7575
//The table format defined using different data type conversion byte
76-
String colType = colRefType.get(mapKey.toUpperCase());
76+
String colType = colRefType.get(mapKey);
7777
Object val = HbaseUtils.convertByte(keyValue.value(), colType);
78-
sideMap.put(mapKey.toUpperCase(), val);
78+
sideMap.put(mapKey, val);
7979
}
8080

8181
if(arg.size() > 0){

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
130130
Object equalObj = input.getField(conValIndex);
131131
if (equalObj == null) {
132132
resultFuture.complete(null);
133+
return;
133134
}
134135
basicDBObject.put(sideInfo.getEqualFieldList().get(i), equalObj);
135136
}
@@ -142,12 +143,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
142143
dealMissKey(input, resultFuture);
143144
return;
144145
} else if (ECacheContentType.MultiLine == val.getType()) {
145-
146+
List<Row> rowList = Lists.newArrayList();
146147
for (Object jsonArray : (List) val.getContent()) {
147148
Row row = fillData(input, jsonArray);
148-
resultFuture.complete(Collections.singleton(row));
149+
rowList.add(row);
149150
}
150-
151+
resultFuture.complete(rowList);
151152
} else {
152153
throw new RuntimeException("not support cache obj type " + val.getType());
153154
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
7575
Object equalObj = input.getField(conValIndex);
7676
if (equalObj == null) {
7777
resultFuture.complete(null);
78+
return;
7879
}
79-
8080
inputParams.add(equalObj);
8181
}
8282

@@ -89,12 +89,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
8989
dealMissKey(input, resultFuture);
9090
return;
9191
} else if (ECacheContentType.MultiLine == val.getType()) {
92-
92+
List<Row> rowList = Lists.newArrayList();
9393
for (Object jsonArray : (List) val.getContent()) {
9494
Row row = fillData(input, jsonArray);
95-
resultFuture.complete(Collections.singleton(row));
95+
rowList.add(row);
9696
}
97-
97+
resultFuture.complete(rowList);
9898
} else {
9999
throw new RuntimeException("not support cache obj type " + val.getType());
100100
}

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
141141
dealMissKey(input, resultFuture);
142142
return;
143143
}else if(ECacheContentType.MultiLine == val.getType()){
144-
Row row = fillData(input, val.getContent());
145-
resultFuture.complete(Collections.singleton(row));
144+
List<Row> rowList = Lists.newArrayList();
145+
for (Object jsonArray : (List) val.getContent()) {
146+
Row row = fillData(input, val.getContent());
147+
rowList.add(row);
148+
}
149+
resultFuture.complete(rowList);
146150
}else{
147151
throw new RuntimeException("not support cache obj type " + val.getType());
148152
}

0 commit comments

Comments
 (0)