|
21 | 21 | import com.dtstack.flink.sql.side.AbstractSideTableInfo; |
22 | 22 | import com.dtstack.flink.sql.side.BaseAsyncReqRow; |
23 | 23 | import com.dtstack.flink.sql.util.RowDataComplete; |
| 24 | +import io.lettuce.core.RedisURI; |
| 25 | +import io.lettuce.core.api.async.RedisAsyncCommands; |
| 26 | +import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; |
24 | 27 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
25 | 28 | import org.apache.flink.configuration.Configuration; |
26 | 29 | import org.apache.flink.streaming.api.functions.async.ResultFuture; |
@@ -84,35 +87,33 @@ public void open(Configuration parameters) throws Exception { |
84 | 87 | private void buildRedisClient(RedisSideTableInfo tableInfo){ |
85 | 88 | String url = redisSideTableInfo.getUrl(); |
86 | 89 | String password = redisSideTableInfo.getPassword(); |
87 | | - if (password != null){ |
88 | | - password = password + "@"; |
89 | | - } else { |
90 | | - password = ""; |
91 | | - } |
| 90 | + |
92 | 91 | String database = redisSideTableInfo.getDatabase(); |
93 | 92 | if (database == null){ |
94 | 93 | database = "0"; |
95 | 94 | } |
96 | 95 | switch (RedisType.parse(tableInfo.getRedisType())){ |
97 | 96 | case STANDALONE: |
98 | | - StringBuilder redisUri = new StringBuilder(); |
99 | | - redisUri.append("redis://").append(password).append(url).append("/").append(database); |
100 | | - redisClient = RedisClient.create(redisUri.toString()); |
| 97 | + RedisURI redisURI = RedisURI.create("redis://" + url); |
| 98 | + redisURI.setPassword(password); |
| 99 | + redisURI.setDatabase(Integer.valueOf(database)); |
| 100 | + redisClient = RedisClient.create(redisURI); |
101 | 101 | connection = redisClient.connect(); |
102 | 102 | async = connection.async(); |
103 | 103 | break; |
104 | 104 | case SENTINEL: |
105 | | - StringBuilder sentinelUri = new StringBuilder(); |
106 | | - sentinelUri.append("redis-sentinel://").append(password) |
107 | | - .append(url).append("/").append(database).append("#").append(redisSideTableInfo.getMasterName()); |
108 | | - redisClient = RedisClient.create(sentinelUri.toString()); |
| 105 | + RedisURI redisSentinelURI = RedisURI.create("redis-sentinel://" + url); |
| 106 | + redisSentinelURI.setPassword(password); |
| 107 | + redisSentinelURI.setDatabase(Integer.valueOf(database)); |
| 108 | + redisSentinelURI.setSentinelMasterId(redisSideTableInfo.getMasterName()); |
| 109 | + redisClient = RedisClient.create(redisSentinelURI); |
109 | 110 | connection = redisClient.connect(); |
110 | 111 | async = connection.async(); |
111 | 112 | break; |
112 | 113 | case CLUSTER: |
113 | | - StringBuilder clusterUri = new StringBuilder(); |
114 | | - clusterUri.append("redis://").append(password).append(url); |
115 | | - clusterClient = RedisClusterClient.create(clusterUri.toString()); |
| 114 | + RedisURI clusterURI = RedisURI.create("redis://" + url); |
| 115 | + clusterURI.setPassword(password); |
| 116 | + clusterClient = RedisClusterClient.create(clusterURI); |
116 | 117 | clusterConnection = clusterClient.connect(); |
117 | 118 | async = clusterConnection.async(); |
118 | 119 | default: |
|
0 commit comments