|
21 | 21 | import com.dtstack.flink.sql.side.AbstractSideTableInfo; |
22 | 22 | import com.dtstack.flink.sql.side.BaseAsyncReqRow; |
23 | 23 | import com.dtstack.flink.sql.util.RowDataComplete; |
| 24 | +import io.lettuce.core.api.async.RedisAsyncCommands; |
| 25 | +import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; |
24 | 26 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
25 | 27 | import org.apache.flink.configuration.Configuration; |
26 | 28 | import org.apache.flink.streaming.api.functions.async.ResultFuture; |
@@ -84,37 +86,36 @@ public void open(Configuration parameters) throws Exception { |
84 | 86 | private void buildRedisClient(RedisSideTableInfo tableInfo){ |
85 | 87 | String url = redisSideTableInfo.getUrl(); |
86 | 88 | String password = redisSideTableInfo.getPassword(); |
87 | | - if (password != null){ |
88 | | - password = password + "@"; |
89 | | - } else { |
90 | | - password = ""; |
91 | | - } |
| 89 | + |
92 | 90 | String database = redisSideTableInfo.getDatabase(); |
93 | 91 | if (database == null){ |
94 | 92 | database = "0"; |
95 | 93 | } |
96 | 94 | switch (RedisType.parse(tableInfo.getRedisType())){ |
97 | 95 | case STANDALONE: |
98 | 96 | StringBuilder redisUri = new StringBuilder(); |
99 | | - redisUri.append("redis://").append(password).append(url).append("/").append(database); |
| 97 | + redisUri.append("redis://").append(url).append("/").append(database); |
100 | 98 | redisClient = RedisClient.create(redisUri.toString()); |
101 | 99 | connection = redisClient.connect(); |
102 | 100 | async = connection.async(); |
| 101 | + ((RedisAsyncCommands)async).auth(password); |
103 | 102 | break; |
104 | 103 | case SENTINEL: |
105 | 104 | StringBuilder sentinelUri = new StringBuilder(); |
106 | | - sentinelUri.append("redis-sentinel://").append(password) |
| 105 | + sentinelUri.append("redis-sentinel://") |
107 | 106 | .append(url).append("/").append(database).append("#").append(redisSideTableInfo.getMasterName()); |
108 | 107 | redisClient = RedisClient.create(sentinelUri.toString()); |
109 | 108 | connection = redisClient.connect(); |
110 | 109 | async = connection.async(); |
| 110 | + ((RedisAsyncCommands)async).auth(password); |
111 | 111 | break; |
112 | 112 | case CLUSTER: |
113 | 113 | StringBuilder clusterUri = new StringBuilder(); |
114 | | - clusterUri.append("redis://").append(password).append(url); |
| 114 | + clusterUri.append("redis://").append(url); |
115 | 115 | clusterClient = RedisClusterClient.create(clusterUri.toString()); |
116 | 116 | clusterConnection = clusterClient.connect(); |
117 | 117 | async = clusterConnection.async(); |
| 118 | + ((RedisAdvancedClusterAsyncCommands)async).auth(password); |
118 | 119 | default: |
119 | 120 | break; |
120 | 121 | } |
|
0 commit comments