Skip to content

Commit c11c73f

Browse files
committed
[feat] 脏数据插件化:结构改为订阅者模式
1 parent 25d59f6 commit c11c73f

13 files changed

Lines changed: 643 additions & 2 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020

2121
package com.dtstack.flink.sql.util;
2222

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
2423
import com.dtstack.flink.sql.enums.EPluginLoadMode;
25-
import org.apache.commons.lang3.StringUtils;
2624
import com.fasterxml.jackson.core.JsonGenerationException;
2725
import com.fasterxml.jackson.core.JsonParseException;
2826
import com.fasterxml.jackson.databind.JsonMappingException;
2927
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import org.apache.commons.lang3.StringUtils;
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231

dirtyDataManager/manager/pom.xml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.dirtyDataManager</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>manager</artifactId>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>com.fasterxml.jackson.core</groupId>
17+
<artifactId>jackson-databind</artifactId>
18+
<version>2.10.3</version>
19+
</dependency>
20+
21+
<dependency>
22+
<groupId>log4j</groupId>
23+
<artifactId>log4j</artifactId>
24+
<version>1.2.17</version>
25+
</dependency>
26+
27+
<dependency>
28+
<groupId>org.slf4j</groupId>
29+
<artifactId>slf4j-simple</artifactId>
30+
<version>1.7.30</version>
31+
</dependency>
32+
</dependencies>
33+
34+
</project>
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirty.classLoader;
20+
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
import java.net.URL;
25+
import java.net.URLClassLoader;
26+
27+
/**
28+
* @author tiezhu
29+
* Company dtstack
30+
* Date 2020/8/27 星期四
31+
*/
32+
public class DirtyManagerClassLoader extends URLClassLoader {
33+
private final Logger LOG = LoggerFactory.getLogger(DirtyManagerClassLoader.class);
34+
35+
/**
36+
* The parent class loader.
37+
*/
38+
protected ClassLoader parent;
39+
40+
public DirtyManagerClassLoader(URL[] urls, ClassLoader parent) {
41+
super(urls, parent);
42+
this.parent = parent;
43+
}
44+
45+
46+
@Override
47+
public Class<?> loadClass(String name) throws ClassNotFoundException {
48+
return this.loadClass(name, false);
49+
}
50+
51+
@Override
52+
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
53+
synchronized (getClassLoadingLock(name)) {
54+
if (LOG.isDebugEnabled()) {
55+
LOG.debug("loadClass(" + name + ", " + resolve + ")");
56+
}
57+
Class<?> clazz;
58+
59+
// (0.1) Check our previously loaded class cache
60+
clazz = findLoadedClass(name);
61+
if (clazz != null) {
62+
if (LOG.isDebugEnabled()) {
63+
LOG.debug(" Returning class from cache");
64+
}
65+
if (resolve) {
66+
resolveClass(clazz);
67+
}
68+
return (clazz);
69+
}
70+
71+
// (2) Search local repositories
72+
if (LOG.isDebugEnabled()) {
73+
LOG.debug(" Searching local repositories");
74+
}
75+
try {
76+
clazz = findClass(name);
77+
if (clazz != null) {
78+
if (LOG.isDebugEnabled()) {
79+
LOG.debug(" Loading class from local repository");
80+
}
81+
if (resolve) {
82+
resolveClass(clazz);
83+
}
84+
return (clazz);
85+
}
86+
} catch (ClassNotFoundException e) {
87+
// Ignore
88+
}
89+
90+
if (LOG.isDebugEnabled()) {
91+
LOG.debug(" Delegating to parent classloader at end: " + parent);
92+
}
93+
94+
try {
95+
clazz = Class.forName(name, false, parent);
96+
if (LOG.isDebugEnabled()) {
97+
LOG.debug(" Loading class from parent");
98+
}
99+
if (resolve) {
100+
resolveClass(clazz);
101+
}
102+
return (clazz);
103+
} catch (ClassNotFoundException e) {
104+
// Ignore
105+
}
106+
}
107+
108+
throw new ClassNotFoundException(name);
109+
}
110+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirty.consumer;
20+
21+
import com.dtstack.flink.sql.dirty.entity.DirtyDataEntity;
22+
23+
import java.io.Serializable;
24+
import java.util.concurrent.LinkedBlockingQueue;
25+
26+
/**
27+
* @author tiezhu
28+
* Company dtstack
29+
* Date 2020/8/27 星期四
30+
*/
31+
public abstract class AbstractDirtyDataConsumer extends Thread implements Serializable {
32+
private static final long serialVersionUID = -6058598201315176687L;
33+
34+
private long count = 0;
35+
36+
protected LinkedBlockingQueue<DirtyDataEntity> queue;
37+
38+
/**
39+
* 消费队列数据
40+
*/
41+
public abstract void consume() throws InterruptedException;
42+
43+
/**
44+
* 关闭消费者,需要释放资源
45+
*/
46+
public abstract void close();
47+
48+
/**
49+
* 初始化消费者,初始化定时任务
50+
*/
51+
public void init() {
52+
53+
}
54+
55+
@Override
56+
public void run() {
57+
58+
}
59+
60+
public void setQueue(LinkedBlockingQueue<DirtyDataEntity> queue) {
61+
this.queue = queue;
62+
}
63+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirty.entity;
20+
21+
import java.sql.Date;
22+
23+
/**
24+
* @author tiezhu
25+
* Company dtstack
26+
* Date 2020/8/27 星期四
27+
*/
28+
public class DirtyDataEntity {
29+
/**
30+
* 脏数据信息内容
31+
*/
32+
private String dirtyData;
33+
34+
/**
35+
* 脏数据处理时间
36+
*/
37+
private Date processDate;
38+
39+
/**
40+
* 产生脏数据的原因
41+
*/
42+
private String cause;
43+
44+
/**
45+
* 产生异常的字段
46+
*/
47+
private String field;
48+
49+
public String getDirtyData() {
50+
return dirtyData;
51+
}
52+
53+
public void setDirtyData(String dirtyData) {
54+
this.dirtyData = dirtyData;
55+
}
56+
57+
public Date getProcessDate() {
58+
return processDate;
59+
}
60+
61+
public void setProcessDate(Date processDate) {
62+
this.processDate = processDate;
63+
}
64+
65+
public String getCause() {
66+
return cause;
67+
}
68+
69+
public void setCause(String cause) {
70+
this.cause = cause;
71+
}
72+
73+
public String getField() {
74+
return field;
75+
}
76+
77+
public void setField(String field) {
78+
this.field = field;
79+
}
80+
81+
public DirtyDataEntity(String dirtyData, Long processDate, String cause, String field) {
82+
this.dirtyData = dirtyData;
83+
this.processDate = new Date(processDate);
84+
this.cause = cause;
85+
this.field = field;
86+
}
87+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirty.manager;
20+
21+
import com.dtstack.flink.sql.dirty.consumer.AbstractDirtyDataConsumer;
22+
import com.dtstack.flink.sql.dirty.entity.DirtyDataEntity;
23+
24+
import java.io.Serializable;
25+
import java.util.Map;
26+
import java.util.concurrent.LinkedBlockingQueue;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicLong;
29+
30+
/**
31+
* @author tiezhu
32+
* Company dtstack
33+
* Date 2020/8/27 星期四
34+
*/
35+
public class DirtyDataManager implements Serializable {
36+
private static final long serialVersionUID = 7190970299538893497L;
37+
38+
/**
39+
* 写入队列阻塞时间
40+
*/
41+
private long blockingInterval = 60;
42+
43+
/**
44+
* 缓存脏数据信息队列
45+
*/
46+
private LinkedBlockingQueue<DirtyDataEntity> queue = new LinkedBlockingQueue<>();
47+
48+
/**
49+
* 统计manager收集到的脏数据条数
50+
*/
51+
private AtomicLong count = new AtomicLong(0);
52+
53+
private AbstractDirtyDataConsumer consumer;
54+
55+
public static DirtyDataManager newInstance() {
56+
57+
return new DirtyDataManager();
58+
}
59+
60+
public AbstractDirtyDataConsumer createConsumer(Map<String, String> properties) throws Exception {
61+
// 利用类加载的方式动态加载
62+
String type = properties.getOrDefault("type", "print");
63+
return null;
64+
}
65+
66+
public void close() throws InterruptedException {
67+
if (!queue.isEmpty()) {
68+
flush();
69+
}
70+
}
71+
72+
public void flush() throws InterruptedException {
73+
consumer.setQueue(queue);
74+
consumer.consume();
75+
}
76+
77+
// 收集脏数据
78+
public void collectDirtyData(String dataInfo, String cause, String field) throws InterruptedException {
79+
DirtyDataEntity dirtyDataEntity = new DirtyDataEntity(dataInfo, System.currentTimeMillis(), cause, field);
80+
queue.offer(dirtyDataEntity, blockingInterval, TimeUnit.MILLISECONDS);
81+
count.incrementAndGet();
82+
consumer.setQueue(queue);
83+
}
84+
85+
}

0 commit comments

Comments
 (0)