Skip to content

Commit 7a001f8

Browse files
committed
[hotfix-36198][redis] 解决redis异步维表cluster模式连接异常问题
1 parent 895df3c commit 7a001f8

3 files changed

Lines changed: 32 additions & 5 deletions

File tree

hbase/hbase-side/hbase-all-side/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
<groupId>io.netty</groupId>
3434
<artifactId>netty</artifactId>
3535
</exclusion>
36+
<exclusion>
37+
<artifactId>netty-all</artifactId>
38+
<groupId>io.netty</groupId>
39+
</exclusion>
3640
</exclusions>
3741
</dependency>
3842
</dependencies>

hbase/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@
3838
<groupId>log4j</groupId>
3939
</exclusion>
4040
<exclusion>
41+
<artifactId>netty-all</artifactId>
4142
<groupId>io.netty</groupId>
43+
</exclusion>
44+
<exclusion>
4245
<artifactId>netty</artifactId>
46+
<groupId>io.netty</groupId>
4347
</exclusion>
4448
</exclusions>
4549
</dependency>

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,18 @@
3737
import io.lettuce.core.api.async.RedisKeyAsyncCommands;
3838
import io.lettuce.core.cluster.RedisClusterClient;
3939
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
40+
import io.lettuce.core.internal.HostAndPort;
4041
import org.apache.commons.collections.MapUtils;
4142
import org.apache.commons.lang.StringUtils;
4243
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4344
import org.apache.flink.configuration.Configuration;
4445
import org.apache.flink.streaming.api.functions.async.ResultFuture;
4546
import org.apache.flink.table.dataformat.BaseRow;
4647

48+
import java.util.ArrayList;
4749
import java.util.List;
4850
import java.util.Map;
4951
import java.util.Objects;
50-
import java.util.function.Consumer;
5152
import java.util.regex.Matcher;
5253

5354
/**
@@ -133,16 +134,34 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){
133134
async = connection.async();
134135
break;
135136
case CLUSTER:
136-
RedisURI clusterURI = RedisURI.create("redis://" + url);
137-
clusterURI.setPassword(password);
138-
clusterClient = RedisClusterClient.create(clusterURI);
137+
List<RedisURI> clusterURIs = buildClusterURIs(url);
138+
clusterClient = RedisClusterClient.create(clusterURIs);
139139
clusterConnection = clusterClient.connect();
140140
async = clusterConnection.async();
141141
default:
142142
break;
143143
}
144144
}
145145

146+
private List<RedisURI> buildClusterURIs(String url) {
147+
String password = redisSideTableInfo.getPassword();
148+
String database = redisSideTableInfo.getDatabase();
149+
String[] addresses = StringUtils.split(url, ",");
150+
List<RedisURI> redisURIs = new ArrayList<>(addresses.length);
151+
for (String addr : addresses){
152+
HostAndPort hostAndPort = HostAndPort.parse(addr);
153+
RedisURI redisURI = RedisURI.create(hostAndPort.hostText, hostAndPort.port);
154+
if (StringUtils.isNotEmpty(password)) {
155+
redisURI.setPassword(password);
156+
}
157+
if (StringUtils.isNotEmpty(database)) {
158+
redisURI.setDatabase(Integer.valueOf(database));
159+
}
160+
redisURIs.add(redisURI);
161+
}
162+
return redisURIs;
163+
}
164+
146165
private RedisURI.Builder buildSentinelUri(
147166
String host,
148167
String port,
@@ -171,7 +190,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
171190
if (MapUtils.isNotEmpty(values)) {
172191
try {
173192
BaseRow row = fillData(input, values);
174-
dealCacheData(key,CacheObj.buildCacheObj(ECacheContentType.SingleLine, row));
193+
dealCacheData(key, CacheObj.buildCacheObj(ECacheContentType.SingleLine, values));
175194
RowDataComplete.completeBaseRow(resultFuture, row);
176195
} catch (Exception e) {
177196
dealFillDataError(input, resultFuture, e);

0 commit comments

Comments
 (0)