|
31 | 31 | import io.lettuce.core.api.async.RedisStringAsyncCommands; |
32 | 32 | import io.lettuce.core.cluster.RedisClusterClient; |
33 | 33 | import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; |
| 34 | +import org.apache.calcite.sql.JoinType; |
34 | 35 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
35 | 36 | import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; |
36 | 37 | import org.apache.flink.calcite.shaded.com.google.common.collect.Maps; |
|
39 | 40 | import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; |
40 | 41 | import org.apache.flink.types.Row; |
41 | 42 |
|
42 | | -import java.sql.Timestamp; |
| 43 | +import java.util.ArrayList; |
43 | 44 | import java.util.Collections; |
44 | 45 | import java.util.List; |
45 | 46 | import java.util.Map; |
@@ -123,12 +124,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except |
123 | 124 | for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) { |
124 | 125 | Integer conValIndex = sideInfo.getEqualValIndex().get(i); |
125 | 126 | Object equalObj = input.getField(conValIndex); |
| 127 | + |
126 | 128 | if(equalObj == null){ |
127 | | - resultFuture.complete(null); |
| 129 | + dealMissKey(input, resultFuture); |
| 130 | + return; |
128 | 131 | } |
129 | | - |
| 132 | + String value = equalObj.toString(); |
130 | 133 | keyData.add(sideInfo.getEqualFieldList().get(i)); |
131 | | - keyData.add((String) equalObj); |
| 134 | + keyData.add(value); |
132 | 135 | } |
133 | 136 |
|
134 | 137 | String key = buildCacheKey(keyData); |
@@ -158,29 +161,33 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except |
158 | 161 | Map<String, String> keyValue = Maps.newHashMap(); |
159 | 162 | List<String> value = async.keys(key + ":*").get(); |
160 | 163 | String[] values = value.toArray(new String[value.size()]); |
161 | | - RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values); |
162 | | - future.thenAccept(new Consumer<List<KeyValue<String, String>>>() { |
163 | | - @Override |
164 | | - public void accept(List<KeyValue<String, String>> keyValues) { |
165 | | - if (keyValues.size() != 0){ |
166 | | - for (int i=0; i<keyValues.size(); i++){ |
167 | | - String[] splitKeys = keyValues.get(i).getKey().split(":"); |
168 | | - keyValue.put(splitKeys[1], splitKeys[2]); |
169 | | - keyValue.put(splitKeys[3], keyValues.get(i).getValue()); |
170 | | - } |
171 | | - Row row = fillData(input, keyValue); |
172 | | - resultFuture.complete(Collections.singleton(row)); |
173 | | - if(openCache()){ |
174 | | - putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue)); |
175 | | - } |
176 | | - } else { |
177 | | - dealMissKey(input, resultFuture); |
178 | | - if(openCache()){ |
179 | | - putCache(key, CacheMissVal.getMissKeyObj()); |
| 164 | + if (values.length == 0){ |
| 165 | + dealMissKey(input, resultFuture); |
| 166 | + } else { |
| 167 | + RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values); |
| 168 | + future.thenAccept(new Consumer<List<KeyValue<String, String>>>() { |
| 169 | + @Override |
| 170 | + public void accept(List<KeyValue<String, String>> keyValues) { |
| 171 | + if (keyValues.size() != 0) { |
| 172 | + for (int i = 0; i < keyValues.size(); i++) { |
| 173 | + String[] splitKeys = keyValues.get(i).getKey().split(":"); |
| 174 | + keyValue.put(splitKeys[1], splitKeys[2]); |
| 175 | + keyValue.put(splitKeys[3], keyValues.get(i).getValue()); |
| 176 | + } |
| 177 | + Row row = fillData(input, keyValue); |
| 178 | + resultFuture.complete(Collections.singleton(row)); |
| 179 | + if (openCache()) { |
| 180 | + putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue)); |
| 181 | + } |
| 182 | + } else { |
| 183 | + dealMissKey(input, resultFuture); |
| 184 | + if (openCache()) { |
| 185 | + putCache(key, CacheMissVal.getMissKeyObj()); |
| 186 | + } |
180 | 187 | } |
181 | 188 | } |
182 | | - } |
183 | | - }); |
| 189 | + }); |
| 190 | + } |
184 | 191 | } |
185 | 192 |
|
186 | 193 | private String buildCacheKey(List<String> keyData) { |
|
0 commit comments