Skip to content

Commit 2581a6f

Browse files
committed
Merge branch '1.10_release_4.2.x' into feat_1.10_4.2.x_optFile
2 parents 6a76b57 + 8b5b27f commit 2581a6f

5 files changed

Lines changed: 12 additions & 6 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,13 @@ private Map<String, Object> parseInputParam(BaseRow input) {
224224
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
225225
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
226226
Object equalObj = genericRow.getField(conValIndex);
227-
if (equalObj == null) {
228-
return inputParams;
229-
}
227+
// comment by tiezhu
228+
// 假设SQL中有三个主键[a, b, c],同时主键[b]的值为null,那么
229+
// inputParams中只会有主键[a]的值,主键[b, c]都不包含,导致
230+
// 后面rdb 维表替换where 条件时缺少 value,查询SQL 执行失败
231+
// if (equalObj == null) {
232+
// return inputParams;
233+
// }
230234
String columnName = sideInfo.getEqualFieldList().get(i);
231235
inputParams.put(columnName, equalObj);
232236
}

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ public static Double getDoubleVal(Object obj) {
129129
return ((BigDecimal) obj).doubleValue();
130130
} else if (obj instanceof Integer) {
131131
return ((Integer) obj).doubleValue();
132+
} else if (obj instanceof BigInteger) {
133+
return ((BigInteger) obj).doubleValue();
132134
}
133135

134136
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Double.");

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public boolean check() {
166166
Strings.isNullOrEmpty(keytab) &&
167167
Strings.isNullOrEmpty(krb5conf);
168168

169-
Preconditions.checkState(allNotSet, "xh's elasticsearch type of kerberos file is required");
169+
Preconditions.checkState(!allNotSet, "xh's elasticsearch type of kerberos file is required");
170170

171171
return true;
172172
}

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) {
150150
String sideFieldName = sideFieldName1.trim();
151151
ColumnSchema columnSchema = table.getSchema().getColumn(sideFieldName);
152152
if (null != columnSchema) {
153-
KuduUtil.setMapValue(columnSchema.getType(), oneRow, sideFieldName, result);
153+
oneRow.put(sideFieldName, result.getObject(sideFieldName));
154154
}
155155
}
156156
String cacheKey = buildKey(oneRow, sideInfo.getEqualFieldList());

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ public Deferred<List<Row>> call(RowResultIterator results) throws Exception {
301301
String sideFieldName = sideFieldName1.trim();
302302
ColumnSchema columnSchema = table.getSchema().getColumn(sideFieldName);
303303
if (null != columnSchema) {
304-
KuduUtil.setMapValue(columnSchema.getType(), oneRow, sideFieldName, result);
304+
oneRow.put(sideFieldName, result.getObject(sideFieldName));
305305
}
306306
}
307307
BaseRow row = fillData(input, oneRow);

0 commit comments

Comments
 (0)