Skip to content

Commit 1ed345a

Browse files
committed
[feat] mysql consumer
1 parent 7ee482d commit 1ed345a

6 files changed

Lines changed: 125 additions & 45 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/AbstractDirtyDataConsumer.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import java.io.Serializable;
26-
import java.sql.SQLException;
2726
import java.util.Map;
2827
import java.util.concurrent.LinkedBlockingQueue;
2928
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,8 +37,8 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
3837

3938
protected static final Logger LOG = LoggerFactory.getLogger(AbstractDirtyDataConsumer.class);
4039

41-
protected long errorLimit = 1000;
42-
protected long errorCount = 0;
40+
protected Long errorLimit = 1000L;
41+
protected Long errorCount = 0L;
4342

4443
protected long count = 0;
4544

@@ -49,6 +48,8 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
4948

5049
/**
5150
* 消费队列数据
51+
*
52+
* @throws Exception throw exception
5253
*/
5354
public abstract void consume() throws Exception;
5455

@@ -59,6 +60,9 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
5960

6061
/**
6162
* 初始化消费者,初始化定时任务
63+
*
64+
* @param properties 任务参数
65+
* @throws Exception throw exception
6266
*/
6367
public abstract void init(Map<String, String> properties) throws Exception;
6468

@@ -77,9 +81,9 @@ public void run() {
7781
consume();
7882
}
7983
} catch (Exception e) {
80-
LOG.error("consume dirtyData error");
84+
LOG.error("consume dirtyData error", e);
8185
errorCount++;
82-
if (errorCount == errorLimit) {
86+
if (errorCount.equals(errorLimit)) {
8387
throw new RuntimeException("脏数据消费失败达到上限,任务失败");
8488
}
8589
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/entity/DirtyDataEntity.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,12 @@ public String toString() {
9494
", field='" + field + '\'' +
9595
'}';
9696
}
97+
98+
/**
99+
* 获取脏数据信息,返回字符数组
100+
* @return 脏数据信息字符数组
101+
*/
102+
public String[] get() {
103+
return new String[] {dirtyData, String.valueOf(processDate), cause, field};
104+
}
97105
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2222
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
2323
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
24+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2425
import com.dtstack.flink.sql.util.PluginUtil;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
@@ -30,6 +31,7 @@
3031
import java.lang.reflect.Constructor;
3132
import java.util.Map;
3233
import java.util.concurrent.LinkedBlockingQueue;
34+
import java.util.concurrent.ThreadPoolExecutor;
3335
import java.util.concurrent.TimeUnit;
3436
import java.util.concurrent.atomic.AtomicLong;
3537

@@ -39,7 +41,7 @@
3941
* Date 2020/8/27 星期四
4042
*/
4143
public class DirtyDataManager implements Serializable {
42-
//TODO 需要确保产生脏数据后至少一次是成功发送给了consumer,至于consumer是否成功消费,manager不需要关心
44+
4345
private static final long serialVersionUID = 7190970299538893497L;
4446

4547
private static final Logger LOG = LoggerFactory.getLogger(DirtyDataManager.class);
@@ -72,6 +74,14 @@ public class DirtyDataManager implements Serializable {
7274

7375
public static AbstractDirtyDataConsumer consumer;
7476

77+
private static ThreadPoolExecutor dirtyDataConsumer;
78+
79+
public final static int MAX_POOL_SIZE_LIMIT = 5;
80+
81+
private final static int MAX_TASK_QUEUE_SIZE = 100;
82+
83+
private final static String DEFAULT_TYPE = "console";
84+
7585
/**
7686
* 通过参数生成manager实例,并同时将consumer实例化
7787
*/
@@ -80,17 +90,19 @@ public static DirtyDataManager newInstance(Map<String, String> properties) throw
8090
manager.blockingInterval = Long.parseLong(properties.getOrDefault("blockingInterval", "60"));
8191
consumer = createConsumer(properties);
8292
consumer.init(properties);
83-
//TODO 使用线程池创建线程,不要用thread
84-
Thread dirtyDataConsumer = new Thread(consumer.setQueue(manager.queue), "dirtyData Consumer");
85-
dirtyDataConsumer.start();
93+
consumer.setQueue(manager.queue);
94+
dirtyDataConsumer = new ThreadPoolExecutor(MAX_POOL_SIZE_LIMIT, MAX_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
95+
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("dirtyDataConsumer"), new ThreadPoolExecutor.CallerRunsPolicy());
96+
dirtyDataConsumer.execute(consumer);
97+
8698
return manager;
8799
}
88100

89101
/**
90102
* 通过动态加载的方式加载Consumer
91103
*/
92104
private static AbstractDirtyDataConsumer createConsumer(Map<String, String> properties) throws Exception {
93-
String type = properties.getOrDefault("type", "print");
105+
String type = properties.getOrDefault("type", DEFAULT_TYPE);
94106
String consumerType = DIRTY_CONSUMER_PATH + File.separator + type;
95107
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, properties.getOrDefault("pluginPath", null), "shipfile");
96108
String className = CLASS_PRE_STR + "." + type.toLowerCase() + "." + upperCaseFirstChar(type + CLASS_POST_STR);
@@ -106,6 +118,7 @@ private static AbstractDirtyDataConsumer createConsumer(Map<String, String> prop
106118
* 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
107119
*/
108120
public void close() throws Exception {
121+
dirtyDataConsumer.shutdown();
109122
if (!queue.isEmpty() && checkConsumer()) {
110123
consumer.consume();
111124
}
@@ -114,7 +127,7 @@ public void close() throws Exception {
114127
}
115128

116129
/**
117-
* 收集脏数据放入队列缓存中,记录放入失败的数目和存入队列中的总数目,如果放入失败的数目超过一定比列,那么manager任务失败
130+
* 收集脏数据放入队列缓存中,记录放入失败的数目和存入队列中的总数目,如果放入失败的数目超过一定比例,那么manager任务失败
118131
*/
119132
public void collectDirtyData(String dataInfo, String cause, String field) {
120133
DirtyDataEntity dirtyDataEntity = new DirtyDataEntity(dataInfo, System.currentTimeMillis(), cause, field);
@@ -124,6 +137,7 @@ public void collectDirtyData(String dataInfo, String cause, String field) {
124137
} catch (Exception ignored) {
125138
LOG.warn("dirty Data insert error ... Failed number: " + errorCount.incrementAndGet());
126139
LOG.warn("error dirty data:" + dirtyDataEntity.toString());
140+
// TODO 这个失败比例可以作出调整
127141
if (errorCount.get() > Math.ceil(count.longValue() * 0.8)) {
128142
throw new RuntimeException("The number of failed number reaches the limit, manager fails");
129143
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/TestMain.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,25 @@
99
* Date 2020/8/28 星期五
1010
*/
1111
public class TestMain {
12+
private static final Integer DATA_NUMBER = 10000;
13+
1214
public static void main(String[] args) throws Exception {
13-
Map<String, String> properties = new HashMap<>();
14-
properties.put("type", "console");
15-
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQL/sqlplugins");
15+
Map<String, String> properties = new HashMap<>(8);
16+
properties.put("type", "mysql");
17+
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQLTemp/sqlplugins");
18+
properties.put("url", "jdbc:mysql://kerberos01:3306/tiezhu");
19+
properties.put("userName", "dtstack");
20+
properties.put("password", "abc123");
21+
properties.put("isCreatedTable", "false");
22+
properties.put("batchSize", "1");
23+
properties.put("tableName", "DirtyDataFromMysql_2020_09_14_10_51_50");
1624

1725
DirtyDataManager manager = DirtyDataManager.newInstance(properties);
18-
for (int i = 0; i < 100; i++) {
19-
Thread.sleep(1000);
20-
manager.collectDirtyData("testDirtyData", new Exception("testException").getMessage(), "testField");
26+
for (int i = 0; i < DATA_NUMBER; i++) {
27+
Thread.sleep(100);
28+
manager.collectDirtyData("testDirtyData" + i,
29+
new Exception("testException" + i).getMessage(), "testField");
2130
}
31+
manager.close();
2232
}
2333
}

dirtyDataManager/console/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
</goals>
5959
<configuration>
6060
<tasks>
61-
<copy todir="${basedir}/../../sqlplugins/dirtyData/print">
61+
<copy todir="${basedir}/../../sqlplugins/dirtyData/console">
6262
<fileset dir="target/">
6363
<include name="${project.artifactId}-${project.version}.jar"/>
6464
</fileset>

dirtyDataManager/mysql/src/main/java/com/dtstack/flink/sql/dirty/mysql/MysqlDirtyDataConsumer.java

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,90 +19,127 @@
1919
package com.dtstack.flink.sql.dirty.mysql;
2020

2121
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
22+
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
2223

2324
import java.sql.Connection;
2425
import java.sql.DriverManager;
2526
import java.sql.PreparedStatement;
2627
import java.sql.SQLException;
28+
import java.sql.Statement;
29+
import java.text.SimpleDateFormat;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.Collections;
33+
import java.util.List;
2734
import java.util.Map;
35+
import java.util.stream.Collectors;
2836

2937
/**
3038
* @author tiezhu
3139
* Company dtstack
3240
* Date 2020/8/27 星期四
3341
*/
3442
public class MysqlDirtyDataConsumer extends AbstractDirtyDataConsumer {
35-
//TODO 添加batchSize 和 定时任务
43+
3644
private static final long serialVersionUID = -2959753658786001679L;
3745

3846
private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
3947

40-
private final Object LOCK_STR = new Object();
48+
private static final int CONN_VALID_TIME = 1000;
4149

42-
private boolean isCreatedTable = false;
50+
private static final Integer FIELD_NUMBER = 5;
4351

44-
private final String[] tableField = {"id", "dirtyData", "processTime", "cause", "field"};
52+
private final Object LOCK_STR = new Object();
4553

46-
private String SQL = "INSERT INTO ? (?, ?, ?, ?) VALUES (?, ?, ?, ?) ";
54+
private final String[] tableField = {"id", "dirtyData", "processTime", "cause", "field"};
4755

4856
private PreparedStatement statement;
4957

5058
private Connection connection;
5159

52-
private String tableName;
60+
private Long batchSize;
5361

54-
private void setStatement(String url,
55-
String userName,
56-
String password) throws ClassNotFoundException, SQLException {
62+
private void beforeConsume(String url,
63+
String userName,
64+
String password,
65+
String tableName,
66+
boolean isCreatedTable) throws ClassNotFoundException, SQLException {
5767
synchronized (LOCK_STR) {
5868
Class.forName(DRIVER_NAME);
59-
6069
connection = DriverManager.getConnection(url, userName, password);
61-
statement = connection.prepareStatement(SQL);
70+
71+
// create table for dirty data
72+
if (!isCreatedTable) {
73+
createTable(tableName);
74+
}
75+
76+
String insertField = Arrays.stream(tableField)
77+
.map(this::quoteIdentifier)
78+
.collect(Collectors.joining(", "));
79+
String insertSql = "INSERT INTO " + quoteIdentifier(tableName)
80+
+ "(" + insertField + ") VALUES (?, ?, ?, ?, ?)";
81+
statement = connection.prepareStatement(insertSql);
6282
}
6383
}
6484

6585
private String quoteIdentifier(String tableName) {
66-
return "\"" + tableName + "\"";
86+
return "`" + tableName + "`";
6787
}
6888

6989
/**
7090
* 创建存储脏数据的表
7191
*
7292
* @param tableName 表名
73-
* @return 是否创建成功
7493
* @throws SQLException SQL异常
7594
*/
76-
private boolean createTable(String tableName) {
95+
private void createTable(String tableName) throws SQLException {
96+
Statement statement = null;
7797
try {
78-
String defaultTable = "";
7998
String sql =
80-
"CREATE TABLE ` " + tableName + "` (" +
99+
"CREATE TABLE IF NOT EXISTS \n"
100+
+ quoteIdentifier(tableName) + " (\n" +
81101
" `id` int(11) not null AUTO_INCREMENT,\n" +
82-
" `dirtyData` varchar(100) DEFAULT NULL,\n" +
83-
" `processTime` varchar(100) DEFAULT NULL,\n" +
84-
" `cause` date DEFAULT NULL,\n" +
85-
" `field` varchar(100) DEFAULT NULL,\n" +
102+
" `dirtyData` varchar(255) DEFAULT NULL,\n" +
103+
" `processTime` varchar(255) DEFAULT NULL,\n" +
104+
" `cause` varchar(255) DEFAULT NULL,\n" +
105+
" `field` varchar(255) DEFAULT NULL,\n" +
86106
" PRIMARY KEY (id)\n" +
87107
") DEFAULT CHARSET=utf8;";
88-
return statement.execute(sql);
108+
statement = connection.createStatement();
109+
statement.execute(sql);
89110
} catch (SQLException e) {
90111
throw new RuntimeException("create table error !", e);
112+
} finally {
113+
if (statement != null && !statement.isClosed()) {
114+
statement.close();
115+
}
91116
}
92117
}
93118

94119
@Override
95120
public void consume() throws Exception {
96-
if (!isCreatedTable) {
97-
createTable(tableName);
121+
DirtyDataEntity entity = queue.take();
122+
count++;
123+
List<String> data = new ArrayList<>();
124+
data.add(String.valueOf(count));
125+
Collections.addAll(data, entity.get());
126+
for (int i = 0; i < FIELD_NUMBER; i++) {
127+
statement.setString(i + 1, data.get(i));
98128
}
99129

130+
statement.addBatch();
131+
132+
if (count % batchSize == 0) {
133+
statement.executeBatch();
134+
}
100135
}
101136

102137
@Override
103138
public void close() {
139+
isRunning.compareAndSet(true, false);
140+
104141
try {
105-
if (connection != null && !connection.isValid(1000)) {
142+
if (connection != null && !connection.isValid(CONN_VALID_TIME)) {
106143
connection.close();
107144
}
108145

@@ -116,11 +153,18 @@ public void close() {
116153

117154
@Override
118155
public void init(Map<String, String> properties) throws Exception {
119-
tableName = properties.get("tableName");
156+
SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss");
157+
String tableName = properties.getOrDefault("tableName",
158+
"DirtyDataFromMysql_" + timeFormat.format(System.currentTimeMillis()));
120159
String userName = properties.get("userName");
121160
String password = properties.get("password");
122161
String url = properties.get("url");
123-
isCreatedTable = Boolean.parseBoolean(properties.get("isCreatedTable"));
124-
setStatement(url, userName, password);
162+
batchSize = Long.parseLong(properties.getOrDefault("batchSize", "10000"));
163+
errorLimit = Long.parseLong(properties.getOrDefault("errorLimit", "1000"));
164+
165+
boolean isCreatedTable = Boolean.parseBoolean(
166+
properties.getOrDefault("isCreatedTable", "false"));
167+
168+
beforeConsume(url, userName, password, tableName, isCreatedTable);
125169
}
126170
}

0 commit comments

Comments
 (0)