Skip to content

Commit 7ee482d

Browse files
committed
[feat] rename print to console and add TODO
1 parent 055bcf5 commit 7ee482d

9 files changed

Lines changed: 102 additions & 42 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/AbstractDirtyDataConsumer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import java.io.Serializable;
26+
import java.sql.SQLException;
2627
import java.util.Map;
2728
import java.util.concurrent.LinkedBlockingQueue;
2829
import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,7 +60,14 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
5960
/**
6061
* 初始化消费者,初始化定时任务
6162
*/
62-
public abstract void init(Map<String, String> properties);
63+
public abstract void init(Map<String, String> properties) throws Exception;
64+
65+
/**
66+
* 检验consumer是否正在执行
67+
*/
68+
public boolean isRunning() {
69+
return isRunning.get();
70+
}
6371

6472
@Override
6573
public void run() {

core/src/main/java/com/dtstack/flink/sql/dirtyManager/entity/DirtyDataEntity.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,14 @@ public DirtyDataEntity(String dirtyData, Long processDate, String cause, String
8484
this.cause = cause;
8585
this.field = field;
8686
}
87+
88+
@Override
89+
public String toString() {
90+
return "DirtyDataEntity{" +
91+
"dirtyData='" + dirtyData + '\'' +
92+
", processDate=" + processDate +
93+
", cause='" + cause + '\'' +
94+
", field='" + field + '\'' +
95+
'}';
96+
}
8797
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* Date 2020/8/27 星期四
4040
*/
4141
public class DirtyDataManager implements Serializable {
42+
//TODO 需要确保产生脏数据后至少一次是成功发送给了consumer,至于consumer是否成功消费,manager不需要关心
4243
private static final long serialVersionUID = 7190970299538893497L;
4344

4445
private static final Logger LOG = LoggerFactory.getLogger(DirtyDataManager.class);
@@ -64,6 +65,11 @@ public class DirtyDataManager implements Serializable {
6465
*/
6566
private final AtomicLong count = new AtomicLong(0);
6667

68+
/**
69+
* 脏数据写入队列失败条数
70+
*/
71+
private final AtomicLong errorCount = new AtomicLong(0);
72+
6773
public static AbstractDirtyDataConsumer consumer;
6874

6975
/**
@@ -74,6 +80,7 @@ public static DirtyDataManager newInstance(Map<String, String> properties) throw
7480
manager.blockingInterval = Long.parseLong(properties.getOrDefault("blockingInterval", "60"));
7581
consumer = createConsumer(properties);
7682
consumer.init(properties);
83+
//TODO 使用线程池创建线程,不要用thread
7784
Thread dirtyDataConsumer = new Thread(consumer.setQueue(manager.queue), "dirtyData Consumer");
7885
dirtyDataConsumer.start();
7986
return manager;
@@ -102,22 +109,32 @@ public void close() throws Exception {
102109
if (!queue.isEmpty() && checkConsumer()) {
103110
consumer.consume();
104111
}
105-
LOG.info("dirty consumer is closing ......");
112+
LOG.info("dirty consumer is closing ...");
106113
consumer.close();
107114
}
108115

109-
// 收集脏数据
110-
public void collectDirtyData(String dataInfo, String cause, String field) throws InterruptedException {
116+
/**
117+
* 收集脏数据放入队列缓存中,记录放入失败的数目和存入队列中的总数目,如果放入失败的数目超过一定比列,那么manager任务失败
118+
*/
119+
public void collectDirtyData(String dataInfo, String cause, String field) {
111120
DirtyDataEntity dirtyDataEntity = new DirtyDataEntity(dataInfo, System.currentTimeMillis(), cause, field);
112-
queue.offer(dirtyDataEntity, blockingInterval, TimeUnit.MILLISECONDS);
113-
count.incrementAndGet();
121+
try {
122+
queue.offer(dirtyDataEntity, blockingInterval, TimeUnit.MILLISECONDS);
123+
count.incrementAndGet();
124+
} catch (Exception ignored) {
125+
LOG.warn("dirty Data insert error ... Failed number: " + errorCount.incrementAndGet());
126+
LOG.warn("error dirty data:" + dirtyDataEntity.toString());
127+
if (errorCount.get() > Math.ceil(count.longValue() * 0.8)) {
128+
throw new RuntimeException("The number of failed number reaches the limit, manager fails");
129+
}
130+
}
114131
}
115132

116133
/**
117134
* 查看consumer当前状态
118135
*/
119136
public boolean checkConsumer() {
120-
return consumer.isRunning.get();
137+
return consumer.isRunning();
121138
}
122139

123140
/**

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/TestMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
public class TestMain {
1212
public static void main(String[] args) throws Exception {
1313
Map<String, String> properties = new HashMap<>();
14-
properties.put("type", "print");
14+
properties.put("type", "console");
1515
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQL/sqlplugins");
1616

1717
DirtyDataManager manager = DirtyDataManager.newInstance(properties);
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

12-
<artifactId>dirtyConsumer.print</artifactId>
12+
<artifactId>dirtyConsumer.console</artifactId>
1313

1414
<build>
1515
<plugins>
@@ -64,8 +64,8 @@
6464
</fileset>
6565
</copy>
6666

67-
<move file="${basedir}/../../sqlplugins/dirtyData/print/${project.artifactId}-${project.version}.jar"
68-
tofile="${basedir}/../../sqlplugins/dirtyData/print/${project.name}.jar"/>
67+
<move file="${basedir}/../../sqlplugins/dirtyData/console/${project.artifactId}-${project.version}.jar"
68+
tofile="${basedir}/../../sqlplugins/dirtyData/console/${project.name}.jar"/>
6969
</tasks>
7070
</configuration>
7171
</execution>

dirtyDataManager/print/src/main/java/com/dtstack/flink/sql/dirty/print/PrintDirtyDataConsumer.java renamed to dirtyDataManager/console/src/main/java/com/dtstack/flink/sql/dirty/print/ConsoleDirtyDataConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
* Company dtstack
3131
* Date 2020/8/27 星期四
3232
*/
33-
public class PrintDirtyDataConsumer extends AbstractDirtyDataConsumer {
33+
public class ConsoleDirtyDataConsumer extends AbstractDirtyDataConsumer {
3434
private static final long serialVersionUID = 5727194679865135189L;
3535

36-
private final Logger LOG = LoggerFactory.getLogger(PrintDirtyDataConsumer.class);
36+
private final Logger LOG = LoggerFactory.getLogger(ConsoleDirtyDataConsumer.class);
3737

3838
@Override
3939
public void consume() throws InterruptedException {
@@ -48,10 +48,11 @@ public void consume() throws InterruptedException {
4848
@Override
4949
public void close() {
5050
isRunning.compareAndSet(true, false);
51+
LOG.info("console dirty consumer close ...");
5152
}
5253

5354
@Override
5455
public void init(Map<String, String> properties) {
55-
LOG.info("Log dirty consumer init ......");
56+
LOG.info("console dirty consumer init ...");
5657
}
5758
}

dirtyDataManager/print/src/test/java/com/dtstack/flink/sql/dirty/print/TestPrintDirtyDataConsumer.java renamed to dirtyDataManager/console/src/test/java/com/dtstack/flink/sql/dirty/print/TestPrintDirtyDataConsumer.java

File renamed without changes.

dirtyDataManager/mysql/src/main/java/com/dtstack/flink/sql/dirty/mysql/MysqlDirtyDataConsumer.java

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,29 @@
1919
package com.dtstack.flink.sql.dirty.mysql;
2020

2121
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
22-
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
2322

2423
import java.sql.Connection;
2524
import java.sql.DriverManager;
2625
import java.sql.PreparedStatement;
2726
import java.sql.SQLException;
2827
import java.util.Map;
29-
import java.util.concurrent.LinkedBlockingQueue;
3028

3129
/**
3230
* @author tiezhu
3331
* Company dtstack
3432
* Date 2020/8/27 星期四
3533
*/
3634
public class MysqlDirtyDataConsumer extends AbstractDirtyDataConsumer {
35+
//TODO 添加batchSize 和 定时任务
3736
private static final long serialVersionUID = -2959753658786001679L;
3837

3938
private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
4039

40+
private final Object LOCK_STR = new Object();
41+
4142
private boolean isCreatedTable = false;
4243

43-
private String[] tableField = {"id", "dirtyData", "processTime", "cause", "field"};
44+
private final String[] tableField = {"id", "dirtyData", "processTime", "cause", "field"};
4445

4546
private String SQL = "INSERT INTO ? (?, ?, ?, ?) VALUES (?, ?, ?, ?) ";
4647

@@ -50,14 +51,15 @@ public class MysqlDirtyDataConsumer extends AbstractDirtyDataConsumer {
5051

5152
private String tableName;
5253

53-
private PreparedStatement getStatement(String url,
54-
String userName,
55-
String password) throws ClassNotFoundException, SQLException {
56-
Class.forName(DRIVER_NAME);
54+
private void setStatement(String url,
55+
String userName,
56+
String password) throws ClassNotFoundException, SQLException {
57+
synchronized (LOCK_STR) {
58+
Class.forName(DRIVER_NAME);
5759

58-
connection = DriverManager.getConnection(url, userName, password);
59-
statement = connection.prepareStatement(SQL);
60-
return statement;
60+
connection = DriverManager.getConnection(url, userName, password);
61+
statement = connection.prepareStatement(SQL);
62+
}
6163
}
6264

6365
private String quoteIdentifier(String tableName) {
@@ -66,37 +68,59 @@ private String quoteIdentifier(String tableName) {
6668

6769
/**
6870
* 创建存储脏数据的表
69-
* @param tableName 表名
71+
*
72+
* @param tableName 表名
7073
* @return 是否创建成功
7174
* @throws SQLException SQL异常
7275
*/
73-
private boolean createTable(String tableName) throws SQLException {
74-
String sql =
75-
"CREATE TABLE ` " + tableName + "` (" +
76-
" `id` int(11) not null AUTO_INCREMENT,\n" +
77-
" `dirtyData` varchar(100) DEFAULT NULL,\n" +
78-
" `processTime` varchar(100) DEFAULT NULL,\n" +
79-
" `cause` date DEFAULT NULL,\n" +
80-
" `field` varchar(100) DEFAULT NULL,\n" +
81-
" PRIMARY KEY (id)\n" +
82-
") DEFAULT CHARSET=utf8;";
83-
return statement.execute(sql);
76+
private boolean createTable(String tableName) {
77+
try {
78+
String defaultTable = "";
79+
String sql =
80+
"CREATE TABLE ` " + tableName + "` (" +
81+
" `id` int(11) not null AUTO_INCREMENT,\n" +
82+
" `dirtyData` varchar(100) DEFAULT NULL,\n" +
83+
" `processTime` varchar(100) DEFAULT NULL,\n" +
84+
" `cause` date DEFAULT NULL,\n" +
85+
" `field` varchar(100) DEFAULT NULL,\n" +
86+
" PRIMARY KEY (id)\n" +
87+
") DEFAULT CHARSET=utf8;";
88+
return statement.execute(sql);
89+
} catch (SQLException e) {
90+
throw new RuntimeException("create table error !", e);
91+
}
8492
}
8593

8694
@Override
87-
public void consume() throws Exception{
95+
public void consume() throws Exception {
8896
if (!isCreatedTable) {
89-
createTable("tableName");
97+
createTable(tableName);
9098
}
99+
91100
}
92101

93102
@Override
94103
public void close() {
95-
104+
try {
105+
if (connection != null && !connection.isValid(1000)) {
106+
connection.close();
107+
}
108+
109+
if (statement != null && !statement.isClosed()) {
110+
statement.close();
111+
}
112+
} catch (SQLException e) {
113+
throw new RuntimeException("close mysql resource error !");
114+
}
96115
}
97116

98117
@Override
99-
public void init(Map<String, String> properties) {
100-
118+
public void init(Map<String, String> properties) throws Exception {
119+
tableName = properties.get("tableName");
120+
String userName = properties.get("userName");
121+
String password = properties.get("password");
122+
String url = properties.get("url");
123+
isCreatedTable = Boolean.parseBoolean(properties.get("isCreatedTable"));
124+
setStatement(url, userName, password);
101125
}
102126
}

dirtyDataManager/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<artifactId>sql.dirtyConsumer</artifactId>
1313
<packaging>pom</packaging>
1414
<modules>
15-
<module>print</module>
15+
<module>console</module>
1616
<module>mysql</module>
1717
</modules>
1818

0 commit comments

Comments
 (0)