Skip to content

Commit 50c469a

Browse files
committed
Merge remote-tracking branch 'origin/feat_1.10_dirtyDataConsumer' into feat_1.10_dirtyDataConsumer
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java # dirtyDataManager/mysql/src/main/java/com/dtstack/flink/sql/dirty/mysql/MysqlDirtyDataConsumer.java
2 parents 80f0ca9 + 67fc5a4 commit 50c469a

8 files changed

Lines changed: 156 additions & 77 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/AbstractDirtyDataConsumer.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import java.io.Serializable;
26-
import java.sql.SQLException;
2726
import java.util.Map;
2827
import java.util.concurrent.LinkedBlockingQueue;
2928
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicLong;
3030

3131
/**
3232
* @author tiezhu
@@ -38,17 +38,19 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
3838

3939
protected static final Logger LOG = LoggerFactory.getLogger(AbstractDirtyDataConsumer.class);
4040

41-
protected long errorLimit = 1000;
42-
protected long errorCount = 0;
41+
protected Long errorLimit = 1000L;
42+
protected AtomicLong errorCount = new AtomicLong(0L);
4343

44-
protected long count = 0;
44+
protected AtomicLong count = new AtomicLong(0L);
4545

4646
public AtomicBoolean isRunning = new AtomicBoolean(true);
4747

4848
protected LinkedBlockingQueue<DirtyDataEntity> queue;
4949

5050
/**
5151
* 消费队列数据
52+
*
53+
* @throws Exception throw exception
5254
*/
5355
public abstract void consume() throws Exception;
5456

@@ -59,6 +61,9 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
5961

6062
/**
6163
* 初始化消费者,初始化定时任务
64+
*
65+
* @param properties 任务参数
66+
* @throws Exception throw exception
6267
*/
6368
public abstract void init(Map<String, String> properties) throws Exception;
6469

@@ -78,10 +83,10 @@ public void run() {
7883
}
7984
LOG.info("consume dirty data end");
8085
} catch (Exception e) {
81-
LOG.error("consume dirtyData error");
82-
errorCount++;
83-
if (errorCount == errorLimit) {
84-
throw new RuntimeException("脏数据消费失败达到上限,任务失败");
86+
LOG.error("consume dirtyData error", e);
87+
errorCount.incrementAndGet();
88+
if (errorCount.get() == errorLimit) {
89+
throw new RuntimeException("The task failed due to the number of dirty data consume failed reached the limit " + errorLimit);
8590
}
8691
}
8792
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/entity/DirtyDataEntity.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
package com.dtstack.flink.sql.dirtyManager.entity;
2020

2121
import java.sql.Date;
22+
import java.text.SimpleDateFormat;
2223

2324
/**
2425
* @author tiezhu
2526
* Company dtstack
2627
* Date 2020/8/27 星期四
2728
*/
2829
public class DirtyDataEntity {
30+
private final SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
2931
/**
3032
* 脏数据信息内容
3133
*/
@@ -34,7 +36,7 @@ public class DirtyDataEntity {
3436
/**
3537
* 脏数据处理时间
3638
*/
37-
private Date processDate;
39+
private String processDate;
3840

3941
/**
4042
* 产生脏数据的原因
@@ -54,11 +56,11 @@ public void setDirtyData(String dirtyData) {
5456
this.dirtyData = dirtyData;
5557
}
5658

57-
public Date getProcessDate() {
59+
public String getProcessDate() {
5860
return processDate;
5961
}
6062

61-
public void setProcessDate(Date processDate) {
63+
public void setProcessDate(String processDate) {
6264
this.processDate = processDate;
6365
}
6466

@@ -80,7 +82,7 @@ public void setField(String field) {
8082

8183
public DirtyDataEntity(String dirtyData, Long processDate, String cause, String field) {
8284
this.dirtyData = dirtyData;
83-
this.processDate = new Date(processDate);
85+
this.processDate = timeFormat.format(processDate);
8486
this.cause = cause;
8587
this.field = field;
8688
}
@@ -94,4 +96,12 @@ public String toString() {
9496
", field='" + field + '\'' +
9597
'}';
9698
}
99+
100+
/**
101+
* 获取脏数据信息,返回字符数组
102+
* @return 脏数据信息字符数组
103+
*/
104+
public String[] get() {
105+
return new String[] {dirtyData, String.valueOf(processDate), cause, field};
106+
}
97107
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2222
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
2323
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
24+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2425
import com.dtstack.flink.sql.util.PluginUtil;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
@@ -29,10 +30,8 @@
2930
import java.io.Serializable;
3031
import java.lang.reflect.Constructor;
3132
import java.util.Map;
32-
import java.util.concurrent.ExecutorService;
33-
import java.util.concurrent.Executors;
34-
import java.util.concurrent.Future;
3533
import java.util.concurrent.LinkedBlockingQueue;
34+
import java.util.concurrent.ThreadPoolExecutor;
3635
import java.util.concurrent.TimeUnit;
3736
import java.util.concurrent.atomic.AtomicLong;
3837

@@ -42,6 +41,7 @@
4241
* Date 2020/8/27 星期四
4342
*/
4443
public class DirtyDataManager implements Serializable {
44+
4545
private static final long serialVersionUID = 7190970299538893497L;
4646

4747
private static final Logger LOG = LoggerFactory.getLogger(DirtyDataManager.class);
@@ -55,7 +55,7 @@ public class DirtyDataManager implements Serializable {
5555
/**
5656
* 写入队列阻塞时间
5757
*/
58-
private long blockingInterval = 60;
58+
private long blockingInterval;
5959

6060
/**
6161
* 缓存脏数据信息队列
@@ -72,27 +72,44 @@ public class DirtyDataManager implements Serializable {
7272
*/
7373
private final AtomicLong errorCount = new AtomicLong(0);
7474

75-
public AbstractDirtyDataConsumer consumer;
75+
private double errorLimitRate;
76+
77+
public static AbstractDirtyDataConsumer consumer;
78+
79+
private static ThreadPoolExecutor dirtyDataConsumer;
80+
81+
public final static int MAX_POOL_SIZE_LIMIT = 5;
7682

77-
private final ExecutorService executor = Executors.newSingleThreadExecutor();
83+
private final static int MAX_TASK_QUEUE_SIZE = 100;
84+
85+
private final static String DEFAULT_TYPE = "console";
86+
87+
private final static String DEFAULT_ERROR_LIMIT_RATE = "0.8";
88+
89+
private final static String DEFAULT_BLOCKING_INTERVAL = "60";
7890

7991
/**
8092
* 通过参数生成manager实例,并同时将consumer实例化
8193
*/
8294
public static DirtyDataManager newInstance(Map<String, String> properties) throws Exception {
8395
DirtyDataManager manager = new DirtyDataManager();
84-
manager.blockingInterval = Long.parseLong(properties.getOrDefault("blockingInterval", "60"));
85-
manager.consumer = createConsumer(properties);
86-
manager.consumer.init(properties);
87-
manager.executor.execute(manager.consumer);
96+
manager.blockingInterval = Long.parseLong(properties.getOrDefault("blockingInterval", DEFAULT_BLOCKING_INTERVAL));
97+
manager.errorLimitRate = Double.parseDouble(properties.getOrDefault("errorLimitRate", DEFAULT_ERROR_LIMIT_RATE));
98+
consumer = createConsumer(properties);
99+
consumer.init(properties);
100+
consumer.setQueue(manager.queue);
101+
dirtyDataConsumer = new ThreadPoolExecutor(MAX_POOL_SIZE_LIMIT, MAX_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
102+
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("dirtyDataConsumer"), new ThreadPoolExecutor.CallerRunsPolicy());
103+
dirtyDataConsumer.execute(consumer);
104+
88105
return manager;
89106
}
90107

91108
/**
92109
* 通过动态加载的方式加载Consumer
93110
*/
94111
private static AbstractDirtyDataConsumer createConsumer(Map<String, String> properties) throws Exception {
95-
String type = properties.getOrDefault("type", "print");
112+
String type = properties.getOrDefault("type", DEFAULT_TYPE);
96113
String consumerType = DIRTY_CONSUMER_PATH + File.separator + type;
97114
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, properties.getOrDefault("pluginPath", null), "shipfile");
98115
String className = CLASS_PRE_STR + "." + type.toLowerCase() + "." + upperCaseFirstChar(type + CLASS_POST_STR);
@@ -106,18 +123,18 @@ private static AbstractDirtyDataConsumer createConsumer(Map<String, String> prop
106123

107124
/**
108125
* 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
126+
* TODO consumer 关闭时仍有数据没有消费到,假如有500条数据,在结束时实际消费数量可能只有493
109127
*/
110128
public void close() {
111-
if (!queue.isEmpty() && checkConsumer()) {
112-
executor.shutdown();
129+
if (checkConsumer()) {
130+
LOG.info("dirty consumer is closing ...");
131+
consumer.close();
132+
dirtyDataConsumer.shutdownNow();
113133
}
114-
LOG.info("dirty consumer is closing ...");
115-
this.consumer.isRunning.compareAndSet(true, false);
116-
executor.shutdownNow();
117134
}
118135

119136
/**
120-
* 收集脏数据放入队列缓存中,记录放入失败的数目和存入队列中的总数目,如果放入失败的数目超过一定比列,那么manager任务失败
137+
* 收集脏数据放入队列缓存中,记录放入失败的数目和存入队列中的总数目,如果放入失败的数目超过一定比例,那么manager任务失败
121138
*/
122139
public void collectDirtyData(String dataInfo, String cause, String field) {
123140
DirtyDataEntity dirtyDataEntity = new DirtyDataEntity(dataInfo, System.currentTimeMillis(), cause, field);
@@ -127,7 +144,7 @@ public void collectDirtyData(String dataInfo, String cause, String field) {
127144
} catch (Exception ignored) {
128145
LOG.warn("dirty Data insert error ... Failed number: " + errorCount.incrementAndGet());
129146
LOG.warn("error dirty data:" + dirtyDataEntity.toString());
130-
if (errorCount.get() > Math.ceil(count.longValue() * 0.8)) {
147+
if (errorCount.get() > Math.ceil(count.longValue() * errorLimitRate)) {
131148
throw new RuntimeException("The number of failed number reaches the limit, manager fails");
132149
}
133150
}
@@ -137,7 +154,7 @@ public void collectDirtyData(String dataInfo, String cause, String field) {
137154
* 查看consumer当前状态
138155
*/
139156
public boolean checkConsumer() {
140-
return this.consumer.isRunning();
157+
return consumer.isRunning();
141158
}
142159

143160
/**

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/TestMain.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,27 @@
99
* Date 2020/8/28 星期五
1010
*/
1111
public class TestMain {
12+
private static final Integer DATA_NUMBER = 1000;
13+
1214
public static void main(String[] args) throws Exception {
13-
Map<String, String> properties = new HashMap<>();
14-
properties.put("type", "console");
15-
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQL/sqlplugins");
15+
Map<String, String> properties = new HashMap<>(8);
16+
properties.put("type", "mysql");
17+
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQLTemp/sqlplugins");
18+
properties.put("url", "jdbc:mysql://kerberos01:3306/tiezhu");
19+
properties.put("userName", "dtstack");
20+
properties.put("password", "abc123");
21+
properties.put("isCreatedTable", "false");
22+
properties.put("batchSize", "1");
23+
properties.put("tableName", "DirtyDataFromMysql_2020_09_14_10_51_50");
1624

1725
DirtyDataManager manager = DirtyDataManager.newInstance(properties);
18-
for (int i = 0; i < 100; i++) {
19-
Thread.sleep(1000);
20-
manager.collectDirtyData("testDirtyData", new Exception("testException").getMessage(), "testField");
26+
for (int i = 0; i < DATA_NUMBER; i++) {
27+
Thread.sleep(100);
28+
manager.collectDirtyData("testDirtyData" + i,
29+
new Exception("testException" + i).getMessage(), "testField");
30+
if (i == 50) {
31+
manager.close();
32+
}
2133
}
2234
}
2335
}

dirtyDataManager/console/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<version>1.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
11-
11+
<name>dirtyConsumer-console</name>
1212
<artifactId>dirtyConsumer.console</artifactId>
1313

1414
<build>
@@ -58,7 +58,7 @@
5858
</goals>
5959
<configuration>
6060
<tasks>
61-
<copy todir="${basedir}/../../sqlplugins/dirtyData/print">
61+
<copy todir="${basedir}/../../sqlplugins/dirtyData/console">
6262
<fileset dir="target/">
6363
<include name="${project.artifactId}-${project.version}.jar"/>
6464
</fileset>

dirtyDataManager/console/src/main/java/com/dtstack/flink/sql/dirty/print/ConsoleDirtyDataConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class ConsoleDirtyDataConsumer extends AbstractDirtyDataConsumer {
3838
@Override
3939
public void consume() throws InterruptedException {
4040
DirtyDataEntity dataEntity = queue.take();
41-
count++;
41+
count.incrementAndGet();
4242
LOG.warn("get dirtyData: " + dataEntity.getDirtyData() + "\n"
4343
+ "cause: " + dataEntity.getCause() + "\n"
4444
+ "processTime: " + dataEntity.getProcessDate() + "\n"

dirtyDataManager/mysql/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<version>1.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
11-
11+
<name>dirtyConsumer-mysql</name>
1212
<properties>
1313
<mysql.connector.version>5.1.46</mysql.connector.version>
1414
</properties>

0 commit comments

Comments
 (0)