2727import org .slf4j .LoggerFactory ;
2828
2929import java .io .Serializable ;
30- import java .util .Properties ;
30+ import java .util .Map ;
31+ import java .util .Objects ;
3132import java .util .concurrent .LinkedBlockingQueue ;
3233import java .util .concurrent .ThreadPoolExecutor ;
3334import java .util .concurrent .TimeUnit ;
5051 * Date 2020/8/27 星期四
5152 */
5253public class DirtyDataManager implements Serializable {
54+ private static final long serialVersionUID = 1L ;
5355
5456 public final static int MAX_POOL_SIZE_LIMIT = 5 ;
55- private static final long serialVersionUID = 7190970299538893497L ;
5657 private static final Logger LOG = LoggerFactory .getLogger (DirtyDataManager .class );
5758 private final static int MAX_TASK_QUEUE_SIZE = 100 ;
58- public static AbstractDirtyDataConsumer consumer ;
5959
60- private static ThreadPoolExecutor dirtyDataConsumer ;
60+ private AbstractDirtyDataConsumer consumer ;
61+ private transient ThreadPoolExecutor dirtyDataConsumer ;
62+
63+ private static final DirtyDataManager INSTANCE = new DirtyDataManager ();
64+
6165 /**
6266 * 统计manager收集到的脏数据条数
6367 */
@@ -75,30 +79,51 @@ public class DirtyDataManager implements Serializable {
7579 */
7680 private double errorLimitRate ;
7781
82+ private DirtyDataManager () {
83+
84+ }
85+
7886 /**
7987 * 通过参数生成manager实例,并同时将consumer实例化
8088 */
81- public static DirtyDataManager newInstance (Properties properties ) {
89+ public static DirtyDataManager newInstance (Map < String , Object > properties ) {
8290 try {
83- DirtyDataManager manager = new DirtyDataManager ();
84- manager .blockingInterval = Long .parseLong (String .valueOf (properties .getOrDefault (DIRTY_BLOCK_STR , DEFAULT_BLOCKING_INTERVAL )));
85- manager .errorLimitRate = Double .parseDouble (String .valueOf (properties .getOrDefault (DIRTY_LIMIT_RATE_STR , DEFAULT_ERROR_LIMIT_RATE )));
86- consumer = DirtyConsumerFactory .getDirtyConsumer (
87- properties .getProperty (PLUGIN_TYPE_STR , DEFAULT_TYPE )
88- , properties .getProperty (PLUGIN_PATH_STR )
89- , properties .getProperty (PLUGIN_LOAD_MODE_STR )
90- );
91- consumer .init (properties );
92- consumer .setQueue (new LinkedBlockingQueue <>());
93- dirtyDataConsumer = new ThreadPoolExecutor (MAX_POOL_SIZE_LIMIT , MAX_POOL_SIZE_LIMIT , 0 , TimeUnit .MILLISECONDS ,
94- new LinkedBlockingQueue <>(MAX_TASK_QUEUE_SIZE ), new DTThreadFactory ("dirtyDataConsumer" , true ), new ThreadPoolExecutor .CallerRunsPolicy ());
95- dirtyDataConsumer .execute (consumer );
96- return manager ;
91+ INSTANCE .setBlockingInterval (Long .parseLong (
92+ String .valueOf (properties .getOrDefault (DIRTY_BLOCK_STR , DEFAULT_BLOCKING_INTERVAL ))));
93+ INSTANCE .setErrorLimitRate (Double .parseDouble (
94+ String .valueOf (properties .getOrDefault (DIRTY_LIMIT_RATE_STR , DEFAULT_ERROR_LIMIT_RATE ))));
95+
96+ INSTANCE .setConsumer (properties );
97+ return INSTANCE ;
9798 } catch (Exception e ) {
9899 throw new RuntimeException ("create dirtyManager error!" , e );
99100 }
100101 }
101102
103+ private void setConsumer (Map <String , Object > properties ) throws Exception {
104+ consumer = DirtyConsumerFactory .getDirtyConsumer (
105+ String .valueOf (properties .getOrDefault (PLUGIN_TYPE_STR , DEFAULT_TYPE )),
106+ String .valueOf (properties .get (PLUGIN_PATH_STR )),
107+ String .valueOf (properties .get (PLUGIN_LOAD_MODE_STR ))
108+ );
109+ consumer .init (properties );
110+ consumer .setQueue (new LinkedBlockingQueue <>());
111+ }
112+
113+ public void execute () {
114+ if (Objects .isNull (dirtyDataConsumer )) {
115+ dirtyDataConsumer = new ThreadPoolExecutor (
116+ MAX_POOL_SIZE_LIMIT ,
117+ MAX_POOL_SIZE_LIMIT ,
118+ 0 ,
119+ TimeUnit .MILLISECONDS ,
120+ new LinkedBlockingQueue <>(MAX_TASK_QUEUE_SIZE ),
121+ new DTThreadFactory ("dirtyDataConsumer" , true ),
122+ new ThreadPoolExecutor .CallerRunsPolicy ());
123+ dirtyDataConsumer .execute (consumer );
124+ }
125+ }
126+
102127 /**
103128 * 设置脏数据插件默认配置
104129 *
@@ -113,7 +138,6 @@ public static String buildDefaultDirty() {
113138
114139 /**
115140 * 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
116- * TODO consumer 关闭时仍有数据没有消费到,假如有500条数据,在结束时实际消费数量可能只有493
117141 */
118142 public void close () {
119143 if (checkConsumer ()) {
@@ -150,4 +174,28 @@ public void collectDirtyData(String dataInfo, String cause) {
150174 public boolean checkConsumer () {
151175 return consumer .isRunning ();
152176 }
177+
178+ public AtomicLong getCount () {
179+ return count ;
180+ }
181+
182+ public AtomicLong getErrorCount () {
183+ return errorCount ;
184+ }
185+
186+ public long getBlockingInterval () {
187+ return blockingInterval ;
188+ }
189+
190+ public void setBlockingInterval (long blockingInterval ) {
191+ this .blockingInterval = blockingInterval ;
192+ }
193+
194+ public double getErrorLimitRate () {
195+ return errorLimitRate ;
196+ }
197+
198+ public void setErrorLimitRate (double errorLimitRate ) {
199+ this .errorLimitRate = errorLimitRate ;
200+ }
153201}
0 commit comments