|
18 | 18 |
|
19 | 19 | package com.dtstack.flink.sql.side.cassandra; |
20 | 20 |
|
| 21 | +import org.apache.flink.api.java.typeutils.RowTypeInfo; |
| 22 | +import org.apache.flink.table.runtime.types.CRow; |
| 23 | +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; |
| 24 | +import org.apache.flink.types.Row; |
| 25 | +import org.apache.flink.util.Collector; |
| 26 | + |
21 | 27 | import com.datastax.driver.core.Cluster; |
22 | 28 | import com.datastax.driver.core.ConsistencyLevel; |
23 | 29 | import com.datastax.driver.core.HostDistance; |
|
33 | 39 | import com.dtstack.flink.sql.side.JoinInfo; |
34 | 40 | import com.dtstack.flink.sql.side.SideTableInfo; |
35 | 41 | import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo; |
36 | | -import org.apache.calcite.sql.JoinType; |
37 | | -import org.apache.commons.collections.CollectionUtils; |
38 | | -import org.apache.flink.api.java.typeutils.RowTypeInfo; |
39 | 42 | import com.google.common.collect.Lists; |
40 | 43 | import com.google.common.collect.Maps; |
41 | | -import org.apache.flink.table.runtime.types.CRow; |
42 | | -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; |
43 | | -import org.apache.flink.types.Row; |
44 | | -import org.apache.flink.util.Collector; |
| 44 | +import org.apache.calcite.sql.JoinType; |
| 45 | +import org.apache.commons.collections.CollectionUtils; |
| 46 | +import org.apache.commons.lang3.StringUtils; |
45 | 47 | import org.slf4j.Logger; |
46 | 48 | import org.slf4j.LoggerFactory; |
47 | 49 |
|
@@ -222,9 +224,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) { |
222 | 224 | //重试策略 |
223 | 225 | RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE; |
224 | 226 |
|
225 | | - for (String server : address.split(",")) { |
226 | | - cassandraPort = Integer.parseInt(server.split(":")[1]); |
227 | | - serversList.add(InetAddress.getByName(server.split(":")[0])); |
| 227 | + for (String server : StringUtils.split(address, ",")) { |
| 228 | + cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]); |
| 229 | + serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0])); |
228 | 230 | } |
229 | 231 |
|
230 | 232 | if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) { |
@@ -278,7 +280,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ |
278 | 280 | //load data from table |
279 | 281 | String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE; |
280 | 282 | ResultSet resultSet = session.execute(sql); |
281 | | - String[] sideFieldNames = sideInfo.getSideSelectFields().split(","); |
| 283 | + String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ","); |
282 | 284 | for (com.datastax.driver.core.Row row : resultSet) { |
283 | 285 | Map<String, Object> oneRow = Maps.newHashMap(); |
284 | 286 | for (String fieldName : sideFieldNames) { |
|
0 commit comments