Skip to content

Commit 0d4b0fd

Browse files
authored
Merge pull request #55 from XuQianJin-Stars/v1.5.0_dev
add cassandra side and sink
2 parents 7c2e330 + 87c845f commit 0d4b0fd

21 files changed

Lines changed: 2460 additions & 4 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.cassandra</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.side.all.cassandra</artifactId>
14+
<name>cassandra-all-side</name>
15+
16+
<packaging>jar</packaging>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>com.dtstack.flink</groupId>
21+
<artifactId>sql.side.cassandra.core</artifactId>
22+
<version>1.0-SNAPSHOT</version>
23+
</dependency>
24+
</dependencies>
25+
26+
<build>
27+
<plugins>
28+
<plugin>
29+
<groupId>org.apache.maven.plugins</groupId>
30+
<artifactId>maven-shade-plugin</artifactId>
31+
<version>1.4</version>
32+
<executions>
33+
<execution>
34+
<phase>package</phase>
35+
<goals>
36+
<goal>shade</goal>
37+
</goals>
38+
<configuration>
39+
<artifactSet>
40+
<excludes>
41+
42+
</excludes>
43+
</artifactSet>
44+
<filters>
45+
<filter>
46+
<artifact>*:*</artifact>
47+
<excludes>
48+
<exclude>META-INF/*.SF</exclude>
49+
<exclude>META-INF/*.DSA</exclude>
50+
<exclude>META-INF/*.RSA</exclude>
51+
</excludes>
52+
</filter>
53+
</filters>
54+
</configuration>
55+
</execution>
56+
</executions>
57+
</plugin>
58+
59+
<plugin>
60+
<artifactId>maven-antrun-plugin</artifactId>
61+
<version>1.2</version>
62+
<executions>
63+
<execution>
64+
<id>copy-resources</id>
65+
<!-- here the phase you need -->
66+
<phase>package</phase>
67+
<goals>
68+
<goal>run</goal>
69+
</goals>
70+
<configuration>
71+
<tasks>
72+
<copy todir="${basedir}/../../../plugins/cassandraallside">
73+
<fileset dir="target/">
74+
<include name="${project.artifactId}-${project.version}.jar" />
75+
</fileset>
76+
</copy>
77+
78+
<move file="${basedir}/../../../plugins/cassandraallside/${project.artifactId}-${project.version}.jar"
79+
tofile="${basedir}/../../../plugins/cassandraallside/${project.name}.jar" />
80+
</tasks>
81+
</configuration>
82+
</execution>
83+
</executions>
84+
</plugin>
85+
</plugins>
86+
</build>
87+
88+
</project>
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.cassandra;
20+
21+
import com.datastax.driver.core.Cluster;
22+
import com.datastax.driver.core.ConsistencyLevel;
23+
import com.datastax.driver.core.HostDistance;
24+
import com.datastax.driver.core.PoolingOptions;
25+
import com.datastax.driver.core.QueryOptions;
26+
import com.datastax.driver.core.ResultSet;
27+
import com.datastax.driver.core.Session;
28+
import com.datastax.driver.core.SocketOptions;
29+
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
30+
import com.datastax.driver.core.policies.RetryPolicy;
31+
import com.dtstack.flink.sql.side.AllReqRow;
32+
import com.dtstack.flink.sql.side.FieldInfo;
33+
import com.dtstack.flink.sql.side.JoinInfo;
34+
import com.dtstack.flink.sql.side.SideTableInfo;
35+
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36+
import org.apache.calcite.sql.JoinType;
37+
import org.apache.commons.collections.CollectionUtils;
38+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
39+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
40+
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
41+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
42+
import org.apache.flink.types.Row;
43+
import org.apache.flink.util.Collector;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
46+
47+
import java.net.InetAddress;
48+
import java.sql.SQLException;
49+
import java.sql.Timestamp;
50+
import java.util.ArrayList;
51+
import java.util.Calendar;
52+
import java.util.List;
53+
import java.util.Map;
54+
import java.util.concurrent.atomic.AtomicReference;
55+
56+
/**
57+
* Reason:
58+
* Date: 2018/11/22
59+
*
60+
* @author xuqianjin
61+
*/
62+
public class CassandraAllReqRow extends AllReqRow {
63+
64+
private static final long serialVersionUID = 54015343561288219L;
65+
66+
private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);
67+
68+
private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";
69+
70+
private static final int CONN_RETRY_NUM = 3;
71+
72+
private static final int FETCH_SIZE = 1000;
73+
74+
private transient Cluster cluster;
75+
private transient Session session = null;
76+
77+
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
78+
79+
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
80+
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
81+
}
82+
83+
@Override
84+
protected Row fillData(Row input, Object sideInput) {
85+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
86+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
87+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
88+
Object obj = input.getField(entry.getValue());
89+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
90+
91+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
92+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
93+
obj = ((Timestamp) obj).getTime();
94+
}
95+
row.setField(entry.getKey(), obj);
96+
}
97+
98+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
99+
if (cacheInfo == null) {
100+
row.setField(entry.getKey(), null);
101+
} else {
102+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
103+
}
104+
}
105+
106+
return row;
107+
}
108+
109+
@Override
110+
protected void initCache() throws SQLException {
111+
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
112+
cacheRef.set(newCache);
113+
loadData(newCache);
114+
}
115+
116+
@Override
117+
protected void reloadCache() {
118+
//reload cacheRef and replace to old cacheRef
119+
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
120+
try {
121+
loadData(newCache);
122+
} catch (SQLException e) {
123+
LOG.error("", e);
124+
}
125+
126+
cacheRef.set(newCache);
127+
LOG.info("----- cassandra all cacheRef reload end:{}", Calendar.getInstance());
128+
}
129+
130+
131+
@Override
132+
public void flatMap(Row value, Collector<Row> out) throws Exception {
133+
List<Object> inputParams = Lists.newArrayList();
134+
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135+
Object equalObj = value.getField(conValIndex);
136+
if (equalObj == null) {
137+
out.collect(null);
138+
}
139+
140+
inputParams.add(equalObj);
141+
}
142+
143+
String key = buildKey(inputParams);
144+
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
145+
if (CollectionUtils.isEmpty(cacheList)) {
146+
if (sideInfo.getJoinType() == JoinType.LEFT) {
147+
Row row = fillData(value, null);
148+
out.collect(row);
149+
} else {
150+
return;
151+
}
152+
153+
return;
154+
}
155+
156+
for (Map<String, Object> one : cacheList) {
157+
out.collect(fillData(value, one));
158+
}
159+
160+
}
161+
162+
private String buildKey(List<Object> equalValList) {
163+
StringBuilder sb = new StringBuilder("");
164+
for (Object equalVal : equalValList) {
165+
sb.append(equalVal).append("_");
166+
}
167+
168+
return sb.toString();
169+
}
170+
171+
private String buildKey(Map<String, Object> val, List<String> equalFieldList) {
172+
StringBuilder sb = new StringBuilder("");
173+
for (String equalField : equalFieldList) {
174+
sb.append(val.get(equalField)).append("_");
175+
}
176+
177+
return sb.toString();
178+
}
179+
180+
private Session getConn(CassandraSideTableInfo tableInfo) {
181+
try {
182+
if (session == null) {
183+
QueryOptions queryOptions = new QueryOptions();
184+
//The default consistency level for queries: ConsistencyLevel.TWO.
185+
queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM);
186+
Integer maxRequestsPerConnection = tableInfo.getMaxRequestsPerConnection() == null ? 1 : tableInfo.getMaxRequestsPerConnection();
187+
Integer coreConnectionsPerHost = tableInfo.getCoreConnectionsPerHost() == null ? 8 : tableInfo.getCoreConnectionsPerHost();
188+
Integer maxConnectionsPerHost = tableInfo.getMaxConnectionsPerHost() == null ? 32768 : tableInfo.getMaxConnectionsPerHost();
189+
Integer maxQueueSize = tableInfo.getMaxQueueSize() == null ? 100000 : tableInfo.getMaxQueueSize();
190+
Integer readTimeoutMillis = tableInfo.getReadTimeoutMillis() == null ? 60000 : tableInfo.getReadTimeoutMillis();
191+
Integer connectTimeoutMillis = tableInfo.getConnectTimeoutMillis() == null ? 60000 : tableInfo.getConnectTimeoutMillis();
192+
Integer poolTimeoutMillis = tableInfo.getPoolTimeoutMillis() == null ? 60000 : tableInfo.getPoolTimeoutMillis();
193+
Integer cassandraPort = 0;
194+
String address = tableInfo.getAddress();
195+
String userName = tableInfo.getUserName();
196+
String password = tableInfo.getPassword();
197+
String database = tableInfo.getDatabase();
198+
199+
ArrayList serversList = new ArrayList();
200+
//Read timeout or connection timeout Settings
201+
SocketOptions so = new SocketOptions()
202+
.setReadTimeoutMillis(readTimeoutMillis)
203+
.setConnectTimeoutMillis(connectTimeoutMillis);
204+
205+
//The cluster USES hostdistance.local in the same machine room
206+
//Hostdistance. REMOTE is used for different machine rooms
207+
//Ignore use HostDistance. IGNORED
208+
PoolingOptions poolingOptions = new PoolingOptions()
209+
//Each connection allows a maximum of 64 concurrent requests
210+
.setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection)
211+
//Have at least two connections to each machine in the cluster
212+
.setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnectionsPerHost)
213+
//There are up to eight connections to each machine in the cluster
214+
.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost)
215+
.setMaxQueueSize(maxQueueSize)
216+
.setPoolTimeoutMillis(poolTimeoutMillis);
217+
//重试策略
218+
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
219+
220+
for (String server : address.split(",")) {
221+
cassandraPort = Integer.parseInt(server.split(":")[1]);
222+
serversList.add(InetAddress.getByName(server.split(":")[0]));
223+
}
224+
225+
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
226+
cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy)
227+
.withPort(cassandraPort)
228+
.withPoolingOptions(poolingOptions).withSocketOptions(so)
229+
.withQueryOptions(queryOptions).build();
230+
} else {
231+
cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy)
232+
.withPort(cassandraPort)
233+
.withPoolingOptions(poolingOptions).withSocketOptions(so)
234+
.withCredentials(userName, password)
235+
.withQueryOptions(queryOptions).build();
236+
}
237+
// 建立连接 连接已存在的键空间
238+
session = cluster.connect(database);
239+
LOG.info("connect cassandra is successed!");
240+
}
241+
} catch (Exception e) {
242+
LOG.error("connect cassandra is error:" + e.getMessage());
243+
}
244+
return session;
245+
}
246+
247+
248+
private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQLException {
249+
CassandraSideTableInfo tableInfo = (CassandraSideTableInfo) sideInfo.getSideTableInfo();
250+
Session session = null;
251+
252+
try {
253+
for (int i = 0; i < CONN_RETRY_NUM; i++) {
254+
try {
255+
session = getConn(tableInfo);
256+
break;
257+
} catch (Exception e) {
258+
if (i == CONN_RETRY_NUM - 1) {
259+
throw new RuntimeException("", e);
260+
}
261+
try {
262+
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
263+
+ ",pwd:" + tableInfo.getPassword();
264+
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
265+
Thread.sleep(5 * 1000);
266+
} catch (InterruptedException e1) {
267+
e1.printStackTrace();
268+
}
269+
}
270+
271+
}
272+
273+
//load data from table
274+
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
275+
ResultSet resultSet = session.execute(sql);
276+
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
277+
for (com.datastax.driver.core.Row row : resultSet) {
278+
Map<String, Object> oneRow = Maps.newHashMap();
279+
for (String fieldName : sideFieldNames) {
280+
oneRow.put(fieldName.trim(), row.getObject(fieldName.trim()));
281+
}
282+
String cacheKey = buildKey(oneRow, sideInfo.getEqualFieldList());
283+
List<Map<String, Object>> list = tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList());
284+
list.add(oneRow);
285+
}
286+
} catch (Exception e) {
287+
LOG.error("", e);
288+
} finally {
289+
try {
290+
if (session != null) {
291+
session.close();
292+
}
293+
} catch (Exception e) {
294+
LOG.error("Error while closing session.", e);
295+
}
296+
try {
297+
if (cluster != null) {
298+
cluster.close();
299+
}
300+
} catch (Exception e) {
301+
LOG.error("Error while closing cluster.", e);
302+
}
303+
}
304+
}
305+
}

0 commit comments

Comments
 (0)