|
36 | 36 | import org.apache.flink.calcite.shaded.com.google.common.collect.Maps; |
37 | 37 | import org.apache.flink.configuration.Configuration; |
38 | 38 | import org.apache.flink.streaming.api.functions.async.ResultFuture; |
39 | | -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; |
40 | 39 | import org.apache.flink.types.Row; |
41 | 40 |
|
42 | | -import java.sql.Timestamp; |
43 | 41 | import java.util.Collections; |
44 | 42 | import java.util.List; |
45 | 43 | import java.util.Map; |
@@ -124,12 +122,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except |
124 | 122 | Integer conValIndex = sideInfo.getEqualValIndex().get(i); |
125 | 123 | Object equalObj = input.getField(conValIndex); |
126 | 124 | if(equalObj == null){ |
127 | | - resultFuture.complete(null); |
| 125 | + dealMissKey(input, resultFuture); |
128 | 126 | return; |
129 | 127 | } |
130 | | - |
| 128 | + String value = equalObj.toString(); |
131 | 129 | keyData.add(sideInfo.getEqualFieldList().get(i)); |
132 | | - keyData.add((String) equalObj); |
| 130 | + keyData.add(value); |
133 | 131 | } |
134 | 132 |
|
135 | 133 | String key = buildCacheKey(keyData); |
@@ -159,29 +157,33 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except |
159 | 157 | Map<String, String> keyValue = Maps.newHashMap(); |
160 | 158 | List<String> value = async.keys(key + ":*").get(); |
161 | 159 | String[] values = value.toArray(new String[value.size()]); |
162 | | - RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values); |
163 | | - future.thenAccept(new Consumer<List<KeyValue<String, String>>>() { |
164 | | - @Override |
165 | | - public void accept(List<KeyValue<String, String>> keyValues) { |
166 | | - if (keyValues.size() != 0){ |
167 | | - for (int i=0; i<keyValues.size(); i++){ |
168 | | - String[] splitKeys = keyValues.get(i).getKey().split(":"); |
169 | | - keyValue.put(splitKeys[1], splitKeys[2]); |
170 | | - keyValue.put(splitKeys[3], keyValues.get(i).getValue()); |
171 | | - } |
172 | | - Row row = fillData(input, keyValue); |
173 | | - resultFuture.complete(Collections.singleton(row)); |
174 | | - if(openCache()){ |
175 | | - putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue)); |
176 | | - } |
177 | | - } else { |
178 | | - dealMissKey(input, resultFuture); |
179 | | - if(openCache()){ |
180 | | - putCache(key, CacheMissVal.getMissKeyObj()); |
| 160 | + if (values.length == 0){ |
| 161 | + dealMissKey(input, resultFuture); |
| 162 | + } else { |
| 163 | + RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values); |
| 164 | + future.thenAccept(new Consumer<List<KeyValue<String, String>>>() { |
| 165 | + @Override |
| 166 | + public void accept(List<KeyValue<String, String>> keyValues) { |
| 167 | + if (keyValues.size() != 0) { |
| 168 | + for (int i = 0; i < keyValues.size(); i++) { |
| 169 | + String[] splitKeys = keyValues.get(i).getKey().split(":"); |
| 170 | + keyValue.put(splitKeys[1], splitKeys[2]); |
| 171 | + keyValue.put(splitKeys[3], keyValues.get(i).getValue()); |
| 172 | + } |
| 173 | + Row row = fillData(input, keyValue); |
| 174 | + resultFuture.complete(Collections.singleton(row)); |
| 175 | + if (openCache()) { |
| 176 | + putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue)); |
| 177 | + } |
| 178 | + } else { |
| 179 | + dealMissKey(input, resultFuture); |
| 180 | + if (openCache()) { |
| 181 | + putCache(key, CacheMissVal.getMissKeyObj()); |
| 182 | + } |
181 | 183 | } |
182 | 184 | } |
183 | | - } |
184 | | - }); |
| 185 | + }); |
| 186 | + } |
185 | 187 | } |
186 | 188 |
|
187 | 189 | private String buildCacheKey(List<String> keyData) { |
|
0 commit comments