Skip to content

Commit 055bcf5

Browse files
committed
[feat] 完成 print dirty consumer
1 parent 684b9b1 commit 055bcf5

9 files changed

Lines changed: 160 additions & 45 deletions

File tree

core/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,17 @@
123123
<artifactId>joda-time</artifactId>
124124
<version>2.5</version>
125125
</dependency>
126+
<dependency>
127+
<groupId>org.slf4j</groupId>
128+
<artifactId>slf4j-simple</artifactId>
129+
<version>1.7.30</version>
130+
</dependency>
131+
132+
<dependency>
133+
<groupId>log4j</groupId>
134+
<artifactId>log4j</artifactId>
135+
<version>1.2.17</version>
136+
</dependency>
126137
</dependencies>
127138

128139
<build>

core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/AbstractDirtyDataConsumer.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,37 @@
1919
package com.dtstack.flink.sql.dirtyManager.consumer;
2020

2121
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2224

2325
import java.io.Serializable;
26+
import java.util.Map;
2427
import java.util.concurrent.LinkedBlockingQueue;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2529

2630
/**
2731
* @author tiezhu
2832
* Company dtstack
2933
* Date 2020/8/27 星期四
3034
*/
31-
public abstract class AbstractDirtyDataConsumer extends Thread implements Serializable {
32-
private static final long serialVersionUID = -6058598201315176687L;
35+
public abstract class AbstractDirtyDataConsumer implements Runnable, Serializable {
36+
protected static final long serialVersionUID = -6058598201315176687L;
3337

34-
private long count = 0;
38+
protected static final Logger LOG = LoggerFactory.getLogger(AbstractDirtyDataConsumer.class);
39+
40+
protected long errorLimit = 1000;
41+
protected long errorCount = 0;
42+
43+
protected long count = 0;
44+
45+
public AtomicBoolean isRunning = new AtomicBoolean(true);
3546

3647
protected LinkedBlockingQueue<DirtyDataEntity> queue;
3748

3849
/**
3950
* 消费队列数据
4051
*/
41-
public abstract void consume() throws InterruptedException;
52+
public abstract void consume() throws Exception;
4253

4354
/**
4455
* 关闭消费者,需要释放资源
@@ -48,16 +59,26 @@ public abstract class AbstractDirtyDataConsumer extends Thread implements Seria
4859
/**
4960
* 初始化消费者,初始化定时任务
5061
*/
51-
public void init() {
52-
53-
}
62+
public abstract void init(Map<String, String> properties);
5463

5564
@Override
5665
public void run() {
57-
66+
try {
67+
LOG.info("start to consume dirty data");
68+
while (isRunning.get()) {
69+
consume();
70+
}
71+
} catch (Exception e) {
72+
LOG.error("consume dirtyData error");
73+
errorCount++;
74+
if (errorCount == errorLimit) {
75+
throw new RuntimeException("脏数据消费失败达到上限,任务失败");
76+
}
77+
}
5878
}
5979

60-
public void setQueue(LinkedBlockingQueue<DirtyDataEntity> queue) {
80+
public AbstractDirtyDataConsumer setQueue(LinkedBlockingQueue<DirtyDataEntity> queue) {
6181
this.queue = queue;
82+
return this;
6283
}
6384
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
2323
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
2424
import com.dtstack.flink.sql.util.PluginUtil;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2527

2628
import java.io.File;
2729
import java.io.Serializable;
2830
import java.lang.reflect.Constructor;
29-
import java.net.URL;
3031
import java.util.Map;
31-
import java.util.Set;
3232
import java.util.concurrent.LinkedBlockingQueue;
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.atomic.AtomicLong;
@@ -41,6 +41,8 @@
4141
public class DirtyDataManager implements Serializable {
4242
private static final long serialVersionUID = 7190970299538893497L;
4343

44+
private static final Logger LOG = LoggerFactory.getLogger(DirtyDataManager.class);
45+
4446
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql.dirty";
4547

4648
private static final String CLASS_POST_STR = "DirtyDataConsumer";
@@ -55,53 +57,72 @@ public class DirtyDataManager implements Serializable {
5557
/**
5658
* 缓存脏数据信息队列
5759
*/
58-
private LinkedBlockingQueue<DirtyDataEntity> queue = new LinkedBlockingQueue<>();
60+
public final LinkedBlockingQueue<DirtyDataEntity> queue = new LinkedBlockingQueue<>();
5961

6062
/**
6163
* 统计manager收集到的脏数据条数
6264
*/
63-
private AtomicLong count = new AtomicLong(0);
64-
65-
private AbstractDirtyDataConsumer consumer;
65+
private final AtomicLong count = new AtomicLong(0);
6666

67-
public static DirtyDataManager newInstance() {
67+
public static AbstractDirtyDataConsumer consumer;
6868

69-
return new DirtyDataManager();
69+
/**
70+
* 通过参数生成manager实例,并同时将consumer实例化
71+
*/
72+
public static DirtyDataManager newInstance(Map<String, String> properties) throws Exception {
73+
DirtyDataManager manager = new DirtyDataManager();
74+
manager.blockingInterval = Long.parseLong(properties.getOrDefault("blockingInterval", "60"));
75+
consumer = createConsumer(properties);
76+
consumer.init(properties);
77+
Thread dirtyDataConsumer = new Thread(consumer.setQueue(manager.queue), "dirtyData Consumer");
78+
dirtyDataConsumer.start();
79+
return manager;
7080
}
7181

72-
public AbstractDirtyDataConsumer createConsumer(Map<String, String> properties) throws Exception {
73-
// 利用类加载的方式动态加载
82+
/**
83+
* 通过动态加载的方式加载Consumer
84+
*/
85+
private static AbstractDirtyDataConsumer createConsumer(Map<String, String> properties) throws Exception {
7486
String type = properties.getOrDefault("type", "print");
7587
String consumerType = DIRTY_CONSUMER_PATH + File.separator + type;
7688
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, properties.getOrDefault("pluginPath", null), "shipfile");
7789
String className = CLASS_PRE_STR + "." + type.toLowerCase() + "." + upperCaseFirstChar(type + CLASS_POST_STR);
7890

7991
return ClassLoaderManager.newInstance(consumerJar, cl -> {
8092
Class<?> clazz = cl.loadClass(className);
81-
Constructor<?> constructor = clazz.getConstructor(String.class);
82-
return (AbstractDirtyDataConsumer) constructor.newInstance(type + CLASS_POST_STR);
93+
Constructor<?> constructor = clazz.getConstructor();
94+
return (AbstractDirtyDataConsumer) constructor.newInstance();
8395
});
8496
}
8597

86-
public void close() throws InterruptedException {
87-
if (!queue.isEmpty()) {
88-
flush();
98+
/**
99+
* 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
100+
*/
101+
public void close() throws Exception {
102+
if (!queue.isEmpty() && checkConsumer()) {
103+
consumer.consume();
89104
}
90-
}
91-
92-
public void flush() throws InterruptedException {
93-
consumer.setQueue(queue);
94-
consumer.consume();
105+
LOG.info("dirty consumer is closing ......");
106+
consumer.close();
95107
}
96108

97109
// 收集脏数据
98110
public void collectDirtyData(String dataInfo, String cause, String field) throws InterruptedException {
99111
DirtyDataEntity dirtyDataEntity = new DirtyDataEntity(dataInfo, System.currentTimeMillis(), cause, field);
100112
queue.offer(dirtyDataEntity, blockingInterval, TimeUnit.MILLISECONDS);
101113
count.incrementAndGet();
102-
consumer.setQueue(queue);
103114
}
104115

116+
/**
117+
* 查看consumer当前状态
118+
*/
119+
public boolean checkConsumer() {
120+
return consumer.isRunning.get();
121+
}
122+
123+
/**
124+
* 首字母大写
125+
*/
105126
private static String upperCaseFirstChar(String str) {
106127
return str.substring(0, 1).toUpperCase() + str.substring(1);
107128
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.dtstack.flink.sql.dirtyManager.manager;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
/**
7+
* @author tiezhu
8+
* Company dtstack
9+
* Date 2020/8/28 星期五
10+
*/
11+
public class TestMain {
12+
public static void main(String[] args) throws Exception {
13+
Map<String, String> properties = new HashMap<>();
14+
properties.put("type", "print");
15+
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQL/sqlplugins");
16+
17+
DirtyDataManager manager = DirtyDataManager.newInstance(properties);
18+
for (int i = 0; i < 100; i++) {
19+
Thread.sleep(1000);
20+
manager.collectDirtyData("testDirtyData", new Exception("testException").getMessage(), "testField");
21+
}
22+
}
23+
}
Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package com.dtstack.flink.sql.dirtyManager;
22

3-
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
43
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
5-
import org.junit.Assert;
6-
import org.junit.Test;
74

85
import java.util.HashMap;
96
import java.util.Map;
@@ -14,14 +11,5 @@
1411
* Date 2020/8/28 星期五
1512
*/
1613
public class TestDirtyDataManager {
17-
@Test
18-
public void testCreateConsumer() throws Exception {
19-
Map<String, String> properties = new HashMap<>();
20-
properties.put("type", "print");
21-
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQL/sqlplugins");
2214

23-
DirtyDataManager manager = DirtyDataManager.newInstance();
24-
AbstractDirtyDataConsumer consumer = manager.createConsumer(properties);
25-
Assert.assertNotNull(consumer);
26-
}
2715
}

dirtyDataManager/mysql/src/main/java/com/dtstack/flink/sql/dirty/mysql/MysqlDirtyDataConsumer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
package com.dtstack.flink.sql.dirty.mysql;
2020

2121
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
22+
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
2223

2324
import java.sql.Connection;
2425
import java.sql.DriverManager;
2526
import java.sql.PreparedStatement;
2627
import java.sql.SQLException;
28+
import java.util.Map;
29+
import java.util.concurrent.LinkedBlockingQueue;
2730

2831
/**
2932
* @author tiezhu
@@ -45,6 +48,8 @@ public class MysqlDirtyDataConsumer extends AbstractDirtyDataConsumer {
4548

4649
private Connection connection;
4750

51+
private String tableName;
52+
4853
private PreparedStatement getStatement(String url,
4954
String userName,
5055
String password) throws ClassNotFoundException, SQLException {
@@ -79,12 +84,19 @@ private boolean createTable(String tableName) throws SQLException {
7984
}
8085

8186
@Override
82-
public void consume() {
83-
87+
public void consume() throws Exception{
88+
if (!isCreatedTable) {
89+
createTable("tableName");
90+
}
8491
}
8592

8693
@Override
8794
public void close() {
8895

8996
}
97+
98+
@Override
99+
public void init(Map<String, String> properties) {
100+
101+
}
90102
}

dirtyDataManager/print/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,24 @@
7373
</plugin>
7474
</plugins>
7575
</build>
76+
<dependencies>
77+
<dependency>
78+
<groupId>junit</groupId>
79+
<artifactId>junit</artifactId>
80+
<version>4.8.2</version>
81+
<scope>test</scope>
82+
</dependency>
83+
<dependency>
84+
<groupId>org.slf4j</groupId>
85+
<artifactId>slf4j-simple</artifactId>
86+
<version>1.7.30</version>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>log4j</groupId>
91+
<artifactId>log4j</artifactId>
92+
<version>1.2.17</version>
93+
</dependency>
94+
</dependencies>
7695

7796
</project>

dirtyDataManager/print/src/main/java/com/dtstack/flink/sql/dirty/print/PrintDirtyDataConsumer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

26+
import java.util.Map;
27+
2628
/**
2729
* @author tiezhu
2830
* Company dtstack
@@ -36,6 +38,7 @@ public class PrintDirtyDataConsumer extends AbstractDirtyDataConsumer {
3638
@Override
3739
public void consume() throws InterruptedException {
3840
DirtyDataEntity dataEntity = queue.take();
41+
count++;
3942
LOG.warn("get dirtyData: " + dataEntity.getDirtyData() + "\n"
4043
+ "cause: " + dataEntity.getCause() + "\n"
4144
+ "processTime: " + dataEntity.getProcessDate() + "\n"
@@ -44,6 +47,11 @@ public void consume() throws InterruptedException {
4447

4548
@Override
4649
public void close() {
47-
// Do nothing
50+
isRunning.compareAndSet(true, false);
51+
}
52+
53+
@Override
54+
public void init(Map<String, String> properties) {
55+
LOG.info("Log dirty consumer init ......");
4856
}
4957
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.dtstack.flink.sql.dirty.print;
2+
3+
/**
4+
* @author tiezhu
5+
* Company dtstack
6+
* Date 2020/8/28 星期五
7+
*/
8+
public class TestPrintDirtyDataConsumer {
9+
public static void main(String[] args) {
10+
11+
}
12+
}

0 commit comments

Comments
 (0)