Skip to content

Commit a0514d8

Browse files
author
gituser
committed
Merge branch '1.10_release_4.1.x' into 1.10_release_4.2.x
2 parents bd74aa3 + b954998 commit a0514d8

4 files changed

Lines changed: 34 additions & 6 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public abstract class AbstractTableInfo implements Serializable {
7171
* error data limit. Task will failed once {@link AbstractDtRichOutputFormat#outDirtyRecords}
7272
* count over limit. Default 1000L;
7373
*/
74-
private Long errorLimit = 0L;
74+
// TODO: 暂时默认值设置为10000,后面排期支持在产品上设置
75+
private Long errorLimit = 10000L;
7576

7677
public String[] getFieldTypes() {
7778
return fieldTypes;

hbase/hbase-side/hbase-all-side/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
<groupId>io.netty</groupId>
3434
<artifactId>netty</artifactId>
3535
</exclusion>
36+
<exclusion>
37+
<artifactId>netty-all</artifactId>
38+
<groupId>io.netty</groupId>
39+
</exclusion>
3640
</exclusions>
3741
</dependency>
3842
</dependencies>

hbase/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@
3838
<groupId>log4j</groupId>
3939
</exclusion>
4040
<exclusion>
41+
<artifactId>netty-all</artifactId>
4142
<groupId>io.netty</groupId>
43+
</exclusion>
44+
<exclusion>
4245
<artifactId>netty</artifactId>
46+
<groupId>io.netty</groupId>
4347
</exclusion>
4448
</exclusions>
4549
</dependency>

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,18 @@
3737
import io.lettuce.core.api.async.RedisKeyAsyncCommands;
3838
import io.lettuce.core.cluster.RedisClusterClient;
3939
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
40+
import io.lettuce.core.internal.HostAndPort;
4041
import org.apache.commons.collections.MapUtils;
4142
import org.apache.commons.lang.StringUtils;
4243
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4344
import org.apache.flink.configuration.Configuration;
4445
import org.apache.flink.streaming.api.functions.async.ResultFuture;
4546
import org.apache.flink.table.dataformat.BaseRow;
4647

48+
import java.util.ArrayList;
4749
import java.util.List;
4850
import java.util.Map;
4951
import java.util.Objects;
50-
import java.util.function.Consumer;
5152
import java.util.regex.Matcher;
5253

5354
/**
@@ -133,16 +134,34 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){
133134
async = connection.async();
134135
break;
135136
case CLUSTER:
136-
RedisURI clusterURI = RedisURI.create("redis://" + url);
137-
clusterURI.setPassword(password);
138-
clusterClient = RedisClusterClient.create(clusterURI);
137+
List<RedisURI> clusterURIs = buildClusterURIs(url);
138+
clusterClient = RedisClusterClient.create(clusterURIs);
139139
clusterConnection = clusterClient.connect();
140140
async = clusterConnection.async();
141141
default:
142142
break;
143143
}
144144
}
145145

146+
private List<RedisURI> buildClusterURIs(String url) {
147+
String password = redisSideTableInfo.getPassword();
148+
String database = redisSideTableInfo.getDatabase();
149+
String[] addresses = StringUtils.split(url, ",");
150+
List<RedisURI> redisURIs = new ArrayList<>(addresses.length);
151+
for (String addr : addresses){
152+
HostAndPort hostAndPort = HostAndPort.parse(addr);
153+
RedisURI redisURI = RedisURI.create(hostAndPort.hostText, hostAndPort.port);
154+
if (StringUtils.isNotEmpty(password)) {
155+
redisURI.setPassword(password);
156+
}
157+
if (StringUtils.isNotEmpty(database)) {
158+
redisURI.setDatabase(Integer.valueOf(database));
159+
}
160+
redisURIs.add(redisURI);
161+
}
162+
return redisURIs;
163+
}
164+
146165
private RedisURI.Builder buildSentinelUri(
147166
String host,
148167
String port,
@@ -171,7 +190,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
171190
if (MapUtils.isNotEmpty(values)) {
172191
try {
173192
BaseRow row = fillData(input, values);
174-
dealCacheData(key,CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
193+
dealCacheData(key, CacheObj.buildCacheObj(ECacheContentType.SingleLine, values));
175194
RowDataComplete.completeBaseRow(resultFuture, row);
176195
} catch (Exception e) {
177196
dealFillDataError(input, resultFuture, e);

0 commit comments

Comments
 (0)