Skip to content

Commit dbbbec0

Browse files
committed
[opt] code opt
1 parent 1ed345a commit dbbbec0

5 files changed

Lines changed: 28 additions & 19 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/AbstractDirtyDataConsumer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.concurrent.LinkedBlockingQueue;
2828
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicLong;
2930

3031
/**
3132
* @author tiezhu
@@ -38,9 +39,9 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
3839
protected static final Logger LOG = LoggerFactory.getLogger(AbstractDirtyDataConsumer.class);
3940

4041
protected Long errorLimit = 1000L;
41-
protected Long errorCount = 0L;
42+
protected AtomicLong errorCount = new AtomicLong(0L);
4243

43-
protected long count = 0;
44+
protected AtomicLong count = new AtomicLong(0L);
4445

4546
public AtomicBoolean isRunning = new AtomicBoolean(true);
4647

@@ -82,9 +83,9 @@ public void run() {
8283
}
8384
} catch (Exception e) {
8485
LOG.error("consume dirtyData error", e);
85-
errorCount++;
86-
if (errorCount.equals(errorLimit)) {
87-
throw new RuntimeException("脏数据消费失败达到上限,任务失败");
86+
errorCount.incrementAndGet();
87+
if (errorCount.get() == errorLimit) {
88+
throw new RuntimeException("The task failed due to the number of dirty data consume failed reached the limit " + errorLimit);
8889
}
8990
}
9091
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class DirtyDataManager implements Serializable {
5555
/**
5656
* 写入队列阻塞时间
5757
*/
58-
private long blockingInterval = 60;
58+
private long blockingInterval;
5959

6060
/**
6161
* 缓存脏数据信息队列
@@ -72,6 +72,8 @@ public class DirtyDataManager implements Serializable {
7272
*/
7373
private final AtomicLong errorCount = new AtomicLong(0);
7474

75+
private double errorLimitRate;
76+
7577
public static AbstractDirtyDataConsumer consumer;
7678

7779
private static ThreadPoolExecutor dirtyDataConsumer;
@@ -82,12 +84,17 @@ public class DirtyDataManager implements Serializable {
8284

8385
private final static String DEFAULT_TYPE = "console";
8486

87+
private final static String DEFAULT_ERROR_LIMIT_RATE = "0.8";
88+
89+
private final static String DEFAULT_BLOCKING_INTERVAL = "60";
90+
8591
/**
8692
* 通过参数生成manager实例,并同时将consumer实例化
8793
*/
8894
public static DirtyDataManager newInstance(Map<String, String> properties) throws Exception {
8995
DirtyDataManager manager = new DirtyDataManager();
90-
manager.blockingInterval = Long.parseLong(properties.getOrDefault("blockingInterval", "60"));
96+
manager.blockingInterval = Long.parseLong(properties.getOrDefault("blockingInterval", DEFAULT_BLOCKING_INTERVAL));
97+
manager.errorLimitRate = Double.parseDouble(properties.getOrDefault("errorLimitRate", DEFAULT_ERROR_LIMIT_RATE));
9198
consumer = createConsumer(properties);
9299
consumer.init(properties);
93100
consumer.setQueue(manager.queue);
@@ -119,11 +126,10 @@ private static AbstractDirtyDataConsumer createConsumer(Map<String, String> prop
119126
*/
120127
public void close() throws Exception {
121128
dirtyDataConsumer.shutdown();
122-
if (!queue.isEmpty() && checkConsumer()) {
123-
consumer.consume();
129+
if (checkConsumer()) {
130+
LOG.info("dirty consumer is closing ...");
131+
consumer.close();
124132
}
125-
LOG.info("dirty consumer is closing ...");
126-
consumer.close();
127133
}
128134

129135
/**
@@ -137,8 +143,7 @@ public void collectDirtyData(String dataInfo, String cause, String field) {
137143
} catch (Exception ignored) {
138144
LOG.warn("dirty Data insert error ... Failed number: " + errorCount.incrementAndGet());
139145
LOG.warn("error dirty data:" + dirtyDataEntity.toString());
140-
// TODO 这个失败比例可以作出调整
141-
if (errorCount.get() > Math.ceil(count.longValue() * 0.8)) {
146+
if (errorCount.get() > Math.ceil(count.longValue() * errorLimitRate)) {
142147
throw new RuntimeException("The number of failed number reaches the limit, manager fails");
143148
}
144149
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/TestMain.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* Date 2020/8/28 星期五
1010
*/
1111
public class TestMain {
12-
private static final Integer DATA_NUMBER = 10000;
12+
private static final Integer DATA_NUMBER = 1000;
1313

1414
public static void main(String[] args) throws Exception {
1515
Map<String, String> properties = new HashMap<>(8);
@@ -27,7 +27,9 @@ public static void main(String[] args) throws Exception {
2727
Thread.sleep(100);
2828
manager.collectDirtyData("testDirtyData" + i,
2929
new Exception("testException" + i).getMessage(), "testField");
30+
if (i == 500) {
31+
manager.close();
32+
}
3033
}
31-
manager.close();
3234
}
3335
}

dirtyDataManager/console/src/main/java/com/dtstack/flink/sql/dirty/print/ConsoleDirtyDataConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class ConsoleDirtyDataConsumer extends AbstractDirtyDataConsumer {
3838
@Override
3939
public void consume() throws InterruptedException {
4040
DirtyDataEntity dataEntity = queue.take();
41-
count++;
41+
count.incrementAndGet();
4242
LOG.warn("get dirtyData: " + dataEntity.getDirtyData() + "\n"
4343
+ "cause: " + dataEntity.getCause() + "\n"
4444
+ "processTime: " + dataEntity.getProcessDate() + "\n"

dirtyDataManager/mysql/src/main/java/com/dtstack/flink/sql/dirty/mysql/MysqlDirtyDataConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,17 +119,18 @@ private void createTable(String tableName) throws SQLException {
119119
@Override
120120
public void consume() throws Exception {
121121
DirtyDataEntity entity = queue.take();
122-
count++;
122+
count.incrementAndGet();
123+
123124
List<String> data = new ArrayList<>();
124-
data.add(String.valueOf(count));
125+
data.add(String.valueOf(count.get()));
125126
Collections.addAll(data, entity.get());
126127
for (int i = 0; i < FIELD_NUMBER; i++) {
127128
statement.setString(i + 1, data.get(i));
128129
}
129130

130131
statement.addBatch();
131132

132-
if (count % batchSize == 0) {
133+
if (count.get() % batchSize == 0) {
133134
statement.executeBatch();
134135
}
135136
}

0 commit comments

Comments
 (0)