Skip to content

Commit 67fc5a4

Browse files
committed
[fix] fix some bug
1 parent dbbbec0 commit 67fc5a4

5 files changed

Lines changed: 12 additions & 9 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/dirtyManager/entity/DirtyDataEntity.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
package com.dtstack.flink.sql.dirtyManager.entity;
2020

2121
import java.sql.Date;
22+
import java.text.SimpleDateFormat;
2223

2324
/**
2425
* @author tiezhu
2526
* Company dtstack
2627
* Date 2020/8/27 星期四
2728
*/
2829
public class DirtyDataEntity {
30+
private final SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
2931
/**
3032
* 脏数据信息内容
3133
*/
@@ -34,7 +36,7 @@ public class DirtyDataEntity {
3436
/**
3537
* 脏数据处理时间
3638
*/
37-
private Date processDate;
39+
private String processDate;
3840

3941
/**
4042
* 产生脏数据的原因
@@ -54,11 +56,11 @@ public void setDirtyData(String dirtyData) {
5456
this.dirtyData = dirtyData;
5557
}
5658

57-
public Date getProcessDate() {
59+
public String getProcessDate() {
5860
return processDate;
5961
}
6062

61-
public void setProcessDate(Date processDate) {
63+
public void setProcessDate(String processDate) {
6264
this.processDate = processDate;
6365
}
6466

@@ -80,7 +82,7 @@ public void setField(String field) {
8082

8183
public DirtyDataEntity(String dirtyData, Long processDate, String cause, String field) {
8284
this.dirtyData = dirtyData;
83-
this.processDate = new Date(processDate);
85+
this.processDate = timeFormat.format(processDate);
8486
this.cause = cause;
8587
this.field = field;
8688
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,13 @@ private static AbstractDirtyDataConsumer createConsumer(Map<String, String> prop
123123

124124
/**
125125
* 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
126+
* TODO consumer 关闭时仍有数据没有消费到,假如有500条数据,在结束时实际消费数量可能只有493
126127
*/
127-
public void close() throws Exception {
128-
dirtyDataConsumer.shutdown();
128+
public void close() {
129129
if (checkConsumer()) {
130130
LOG.info("dirty consumer is closing ...");
131131
consumer.close();
132+
dirtyDataConsumer.shutdownNow();
132133
}
133134
}
134135

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/TestMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public static void main(String[] args) throws Exception {
2727
Thread.sleep(100);
2828
manager.collectDirtyData("testDirtyData" + i,
2929
new Exception("testException" + i).getMessage(), "testField");
30-
if (i == 500) {
30+
if (i == 50) {
3131
manager.close();
3232
}
3333
}

dirtyDataManager/console/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<version>1.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
11-
11+
<name>dirtyConsumer-console</name>
1212
<artifactId>dirtyConsumer.console</artifactId>
1313

1414
<build>

dirtyDataManager/mysql/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<version>1.0-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
11-
11+
<name>dirtyConsumer-mysql</name>
1212
<properties>
1313
<mysql.connector.version>5.1.46</mysql.connector.version>
1414
</properties>

0 commit comments

Comments
 (0)