3333import java .util .concurrent .TimeUnit ;
3434import java .util .concurrent .atomic .AtomicLong ;
3535
36+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DEFAULT_BLOCKING_INTERVAL ;
37+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DEFAULT_ERROR_LIMIT_RATE ;
38+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DEFAULT_PRINT_LIMIT ;
39+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DEFAULT_TYPE ;
40+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DIRTY_BLOCK_STR ;
41+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .DIRTY_LIMIT_RATE_STR ;
42+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .PLUGIN_LOAD_MODE_STR ;
43+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .PLUGIN_PATH_STR ;
44+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .PLUGIN_TYPE_STR ;
45+ import static com .dtstack .flink .sql .dirtyManager .manager .DirtyKeys .PRINT_LIMIT_STR ;
46+
3647/**
3748 * @author tiezhu
3849 * Company dtstack
@@ -43,11 +54,7 @@ public class DirtyDataManager implements Serializable {
4354 public final static int MAX_POOL_SIZE_LIMIT = 5 ;
4455 private static final long serialVersionUID = 7190970299538893497L ;
4556 private static final Logger LOG = LoggerFactory .getLogger (DirtyDataManager .class );
46- private static final String DIRTY_BLOCK_STR = "blockingInterval" ;
47- private static final String DIRTY_LIMIT_RATE_STR = "errorLimitRate" ;
4857 private final static int MAX_TASK_QUEUE_SIZE = 100 ;
49- private final static String DEFAULT_ERROR_LIMIT_RATE = "0.8" ;
50- private final static String DEFAULT_BLOCKING_INTERVAL = "60" ;
5158 public static AbstractDirtyDataConsumer consumer ;
5259
5360 private static ThreadPoolExecutor dirtyDataConsumer ;
@@ -77,14 +84,14 @@ public static DirtyDataManager newInstance(Properties properties) {
7784 manager .blockingInterval = Long .parseLong (String .valueOf (properties .getOrDefault (DIRTY_BLOCK_STR , DEFAULT_BLOCKING_INTERVAL )));
7885 manager .errorLimitRate = Double .parseDouble (String .valueOf (properties .getOrDefault (DIRTY_LIMIT_RATE_STR , DEFAULT_ERROR_LIMIT_RATE )));
7986 consumer = DirtyConsumerFactory .getDirtyConsumer (
80- properties .getProperty ("type" )
81- , properties .getProperty ("pluginPath" )
82- , properties .getProperty ("pluginLoadMode" )
87+ properties .getProperty (PLUGIN_TYPE_STR , DEFAULT_TYPE )
88+ , properties .getProperty (PLUGIN_PATH_STR )
89+ , properties .getProperty (PLUGIN_LOAD_MODE_STR )
8390 );
8491 consumer .init (properties );
8592 consumer .setQueue (new LinkedBlockingQueue <>());
8693 dirtyDataConsumer = new ThreadPoolExecutor (MAX_POOL_SIZE_LIMIT , MAX_POOL_SIZE_LIMIT , 0 , TimeUnit .MILLISECONDS ,
87- new LinkedBlockingQueue <>(MAX_TASK_QUEUE_SIZE ), new DTThreadFactory ("dirtyDataConsumer" ), new ThreadPoolExecutor .CallerRunsPolicy ());
94+ new LinkedBlockingQueue <>(MAX_TASK_QUEUE_SIZE ), new DTThreadFactory ("dirtyDataConsumer" , true ), new ThreadPoolExecutor .CallerRunsPolicy ());
8895 dirtyDataConsumer .execute (consumer );
8996 return manager ;
9097 } catch (Exception e ) {
@@ -99,8 +106,8 @@ public static DirtyDataManager newInstance(Properties properties) {
99106 */
100107 public static String buildDefaultDirty () {
101108 JSONObject jsonObject = new JSONObject ();
102- jsonObject .put ("type" , "console" );
103- jsonObject .put ("printLimit" , "1000" );
109+ jsonObject .put (PLUGIN_TYPE_STR , DEFAULT_TYPE );
110+ jsonObject .put (PRINT_LIMIT_STR , DEFAULT_PRINT_LIMIT );
104111 return jsonObject .toJSONString ();
105112 }
106113
@@ -128,6 +135,8 @@ public void collectDirtyData(String dataInfo, String cause) {
128135 LOG .warn ("dirty Data insert error ... Failed number: " + errorCount .incrementAndGet ());
129136 LOG .warn ("error dirty data:" + dirtyDataEntity .toString ());
130137 if (errorCount .get () > Math .ceil (count .longValue () * errorLimitRate )) {
138+ // close consumer and manager
139+ close ();
131140 throw new RuntimeException (String .format ("The number of failed number 【%s】 reaches the limit, manager fails" , errorCount .get ()));
132141 }
133142 }
0 commit comments