|
39 | 39 | import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; |
40 | 40 | import org.apache.flink.types.Row; |
41 | 41 |
|
42 | | -import java.sql.Timestamp; |
| 42 | +import java.util.ArrayList; |
43 | 43 | import java.util.Collections; |
44 | 44 | import java.util.List; |
45 | 45 | import java.util.Map; |
@@ -123,12 +123,18 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except |
123 | 123 | for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) { |
124 | 124 | Integer conValIndex = sideInfo.getEqualValIndex().get(i); |
125 | 125 | Object equalObj = input.getField(conValIndex); |
| 126 | + |
| 127 | + String value = ""; |
| 128 | + |
126 | 129 | if(equalObj == null){ |
127 | 130 | resultFuture.complete(null); |
| 131 | + value = "null"; |
| 132 | + } else { |
| 133 | + value = equalObj.toString(); |
128 | 134 | } |
129 | 135 |
|
130 | 136 | keyData.add(sideInfo.getEqualFieldList().get(i)); |
131 | | - keyData.add((String) equalObj); |
| 137 | + keyData.add(value); |
132 | 138 | } |
133 | 139 |
|
134 | 140 | String key = buildCacheKey(keyData); |
@@ -158,29 +164,34 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except |
158 | 164 | Map<String, String> keyValue = Maps.newHashMap(); |
159 | 165 | List<String> value = async.keys(key + ":*").get(); |
160 | 166 | String[] values = value.toArray(new String[value.size()]); |
161 | | - RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values); |
162 | | - future.thenAccept(new Consumer<List<KeyValue<String, String>>>() { |
163 | | - @Override |
164 | | - public void accept(List<KeyValue<String, String>> keyValues) { |
165 | | - if (keyValues.size() != 0){ |
166 | | - for (int i=0; i<keyValues.size(); i++){ |
167 | | - String[] splitKeys = keyValues.get(i).getKey().split(":"); |
168 | | - keyValue.put(splitKeys[1], splitKeys[2]); |
169 | | - keyValue.put(splitKeys[3], keyValues.get(i).getValue()); |
170 | | - } |
171 | | - Row row = fillData(input, keyValue); |
172 | | - resultFuture.complete(Collections.singleton(row)); |
173 | | - if(openCache()){ |
174 | | - putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue)); |
175 | | - } |
176 | | - } else { |
177 | | - dealMissKey(input, resultFuture); |
178 | | - if(openCache()){ |
179 | | - putCache(key, CacheMissVal.getMissKeyObj()); |
| 167 | + if (values.length == 0){ |
| 168 | + Row row = fillData(input, null); |
| 169 | + resultFuture.complete(Collections.singleton(row)); |
| 170 | + } else { |
| 171 | + RedisFuture<List<KeyValue<String, String>>> future = ((RedisStringAsyncCommands) async).mget(values); |
| 172 | + future.thenAccept(new Consumer<List<KeyValue<String, String>>>() { |
| 173 | + @Override |
| 174 | + public void accept(List<KeyValue<String, String>> keyValues) { |
| 175 | + if (keyValues.size() != 0) { |
| 176 | + for (int i = 0; i < keyValues.size(); i++) { |
| 177 | + String[] splitKeys = keyValues.get(i).getKey().split(":"); |
| 178 | + keyValue.put(splitKeys[1], splitKeys[2]); |
| 179 | + keyValue.put(splitKeys[3], keyValues.get(i).getValue()); |
| 180 | + } |
| 181 | + Row row = fillData(input, keyValue); |
| 182 | + resultFuture.complete(Collections.singleton(row)); |
| 183 | + if (openCache()) { |
| 184 | + putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, keyValue)); |
| 185 | + } |
| 186 | + } else { |
| 187 | + dealMissKey(input, resultFuture); |
| 188 | + if (openCache()) { |
| 189 | + putCache(key, CacheMissVal.getMissKeyObj()); |
| 190 | + } |
180 | 191 | } |
181 | 192 | } |
182 | | - } |
183 | | - }); |
| 193 | + }); |
| 194 | + } |
184 | 195 | } |
185 | 196 |
|
186 | 197 | private String buildCacheKey(List<String> keyData) { |
|
0 commit comments