|
37 | 37 | import io.lettuce.core.api.async.RedisKeyAsyncCommands; |
38 | 38 | import io.lettuce.core.cluster.RedisClusterClient; |
39 | 39 | import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; |
| 40 | +import io.lettuce.core.internal.HostAndPort; |
40 | 41 | import org.apache.commons.collections.MapUtils; |
41 | 42 | import org.apache.commons.lang.StringUtils; |
42 | 43 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
43 | 44 | import org.apache.flink.configuration.Configuration; |
44 | 45 | import org.apache.flink.streaming.api.functions.async.ResultFuture; |
45 | 46 | import org.apache.flink.table.dataformat.BaseRow; |
46 | 47 |
|
| 48 | +import java.util.ArrayList; |
47 | 49 | import java.util.List; |
48 | 50 | import java.util.Map; |
49 | 51 | import java.util.Objects; |
50 | | -import java.util.function.Consumer; |
51 | 52 | import java.util.regex.Matcher; |
52 | 53 |
|
53 | 54 | /** |
@@ -133,16 +134,34 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){ |
133 | 134 | async = connection.async(); |
134 | 135 | break; |
135 | 136 | case CLUSTER: |
136 | | - RedisURI clusterURI = RedisURI.create("redis://" + url); |
137 | | - clusterURI.setPassword(password); |
138 | | - clusterClient = RedisClusterClient.create(clusterURI); |
| 137 | + List<RedisURI> clusterURIs = buildClusterURIs(url); |
| 138 | + clusterClient = RedisClusterClient.create(clusterURIs); |
139 | 139 | clusterConnection = clusterClient.connect(); |
140 | 140 | async = clusterConnection.async(); |
141 | 141 | default: |
142 | 142 | break; |
143 | 143 | } |
144 | 144 | } |
145 | 145 |
|
| 146 | + private List<RedisURI> buildClusterURIs(String url) { |
| 147 | + String password = redisSideTableInfo.getPassword(); |
| 148 | + String database = redisSideTableInfo.getDatabase(); |
| 149 | + String[] addresses = StringUtils.split(url, ","); |
| 150 | + List<RedisURI> redisURIs = new ArrayList<>(addresses.length); |
| 151 | + for (String addr : addresses){ |
| 152 | + HostAndPort hostAndPort = HostAndPort.parse(addr); |
| 153 | + RedisURI redisURI = RedisURI.create(hostAndPort.hostText, hostAndPort.port); |
| 154 | + if (StringUtils.isNotEmpty(password)) { |
| 155 | + redisURI.setPassword(password); |
| 156 | + } |
| 157 | + if (StringUtils.isNotEmpty(database)) { |
| 158 | + redisURI.setDatabase(Integer.valueOf(database)); |
| 159 | + } |
| 160 | + redisURIs.add(redisURI); |
| 161 | + } |
| 162 | + return redisURIs; |
| 163 | + } |
| 164 | + |
146 | 165 | private RedisURI.Builder buildSentinelUri( |
147 | 166 | String host, |
148 | 167 | String port, |
@@ -171,7 +190,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re |
171 | 190 | if (MapUtils.isNotEmpty(values)) { |
172 | 191 | try { |
173 | 192 | BaseRow row = fillData(input, values); |
174 | | - dealCacheData(key,CacheObj.buildCacheObj(ECacheContentType.SingleLine, row)); |
| 193 | + dealCacheData(key, CacheObj.buildCacheObj(ECacheContentType.SingleLine, values)); |
175 | 194 | RowDataComplete.completeBaseRow(resultFuture, row); |
176 | 195 | } catch (Exception e) { |
177 | 196 | dealFillDataError(input, resultFuture, e); |
|
0 commit comments