|
21 | 21 | import com.dtstack.flink.sql.side.AbstractSideTableInfo; |
22 | 22 | import com.dtstack.flink.sql.side.BaseAsyncReqRow; |
23 | 23 | import com.dtstack.flink.sql.util.RowDataComplete; |
| 24 | +import io.lettuce.core.RedisURI; |
24 | 25 | import io.lettuce.core.api.async.RedisAsyncCommands; |
25 | 26 | import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; |
26 | 27 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
@@ -93,29 +94,28 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){ |
93 | 94 | } |
94 | 95 | switch (RedisType.parse(tableInfo.getRedisType())){ |
95 | 96 | case STANDALONE: |
96 | | - StringBuilder redisUri = new StringBuilder(); |
97 | | - redisUri.append("redis://").append(url).append("/").append(database); |
98 | | - redisClient = RedisClient.create(redisUri.toString()); |
| 97 | + RedisURI redisURI = RedisURI.create("redis://" + url); |
| 98 | + redisURI.setPassword(password); |
| 99 | + redisURI.setDatabase(Integer.valueOf(database)); |
| 100 | + redisClient = RedisClient.create(redisURI); |
99 | 101 | connection = redisClient.connect(); |
100 | 102 | async = connection.async(); |
101 | | - ((RedisAsyncCommands)async).auth(password); |
102 | 103 | break; |
103 | 104 | case SENTINEL: |
104 | | - StringBuilder sentinelUri = new StringBuilder(); |
105 | | - sentinelUri.append("redis-sentinel://") |
106 | | - .append(url).append("/").append(database).append("#").append(redisSideTableInfo.getMasterName()); |
107 | | - redisClient = RedisClient.create(sentinelUri.toString()); |
| 105 | + RedisURI redisSentinelURI = RedisURI.create("redis-sentinel://" + url); |
| 106 | + redisSentinelURI.setPassword(password); |
| 107 | + redisSentinelURI.setDatabase(Integer.valueOf(database)); |
| 108 | + redisSentinelURI.setSentinelMasterId(redisSideTableInfo.getMasterName()); |
| 109 | + redisClient = RedisClient.create(redisSentinelURI); |
108 | 110 | connection = redisClient.connect(); |
109 | 111 | async = connection.async(); |
110 | | - ((RedisAsyncCommands)async).auth(password); |
111 | 112 | break; |
112 | 113 | case CLUSTER: |
113 | | - StringBuilder clusterUri = new StringBuilder(); |
114 | | - clusterUri.append("redis://").append(url); |
115 | | - clusterClient = RedisClusterClient.create(clusterUri.toString()); |
| 114 | + RedisURI clusterURI = RedisURI.create("redis://" + url); |
| 115 | + clusterURI.setPassword(password); |
| 116 | + clusterClient = RedisClusterClient.create(clusterURI); |
116 | 117 | clusterConnection = clusterClient.connect(); |
117 | 118 | async = clusterConnection.async(); |
118 | | - ((RedisAdvancedClusterAsyncCommands)async).auth(password); |
119 | 119 | default: |
120 | 120 | break; |
121 | 121 | } |
|
0 commit comments