Skip to content

Commit 684b9b1

Browse files
committed
[feat] 通过URLClassLoader动态加载 consumer
1 parent f825e9b commit 684b9b1

12 files changed

Lines changed: 209 additions & 218 deletions

File tree

dirtyDataManager/manager/src/main/java/com/dtstack/flink/sql/dirty/consumer/AbstractDirtyDataConsumer.java renamed to core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/AbstractDirtyDataConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.dirty.consumer;
19+
package com.dtstack.flink.sql.dirtyManager.consumer;
2020

21-
import com.dtstack.flink.sql.dirty.entity.DirtyDataEntity;
21+
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
2222

2323
import java.io.Serializable;
2424
import java.util.concurrent.LinkedBlockingQueue;

dirtyDataManager/manager/src/main/java/com/dtstack/flink/sql/dirty/entity/DirtyDataEntity.java renamed to core/src/main/java/com/dtstack/flink/sql/dirtyManager/entity/DirtyDataEntity.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.dirty.entity;
19+
package com.dtstack.flink.sql.dirtyManager.entity;
2020

2121
import java.sql.Date;
2222

dirtyDataManager/manager/src/main/java/com/dtstack/flink/sql/dirty/manager/DirtyDataManager.java renamed to core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,19 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.dirty.manager;
19+
package com.dtstack.flink.sql.dirtyManager.manager;
2020

21-
import com.dtstack.flink.sql.dirty.consumer.AbstractDirtyDataConsumer;
22-
import com.dtstack.flink.sql.dirty.entity.DirtyDataEntity;
21+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22+
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
23+
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
24+
import com.dtstack.flink.sql.util.PluginUtil;
2325

26+
import java.io.File;
2427
import java.io.Serializable;
28+
import java.lang.reflect.Constructor;
29+
import java.net.URL;
2530
import java.util.Map;
31+
import java.util.Set;
2632
import java.util.concurrent.LinkedBlockingQueue;
2733
import java.util.concurrent.TimeUnit;
2834
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +41,12 @@
3541
public class DirtyDataManager implements Serializable {
3642
private static final long serialVersionUID = 7190970299538893497L;
3743

44+
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql.dirty";
45+
46+
private static final String CLASS_POST_STR = "DirtyDataConsumer";
47+
48+
private static final String DIRTY_CONSUMER_PATH = "dirtyData";
49+
3850
/**
3951
* 写入队列阻塞时间
4052
*/
@@ -50,7 +62,7 @@ public class DirtyDataManager implements Serializable {
5062
*/
5163
private AtomicLong count = new AtomicLong(0);
5264

53-
private AbstractDirtyDataConsumer consumer;
65+
private AbstractDirtyDataConsumer consumer;
5466

5567
public static DirtyDataManager newInstance() {
5668

@@ -60,7 +72,15 @@ public static DirtyDataManager newInstance() {
6072
public AbstractDirtyDataConsumer createConsumer(Map<String, String> properties) throws Exception {
6173
// 利用类加载的方式动态加载
6274
String type = properties.getOrDefault("type", "print");
63-
return null;
75+
String consumerType = DIRTY_CONSUMER_PATH + File.separator + type;
76+
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, properties.getOrDefault("pluginPath", null), "shipfile");
77+
String className = CLASS_PRE_STR + "." + type.toLowerCase() + "." + upperCaseFirstChar(type + CLASS_POST_STR);
78+
79+
return ClassLoaderManager.newInstance(consumerJar, cl -> {
80+
Class<?> clazz = cl.loadClass(className);
81+
Constructor<?> constructor = clazz.getConstructor(String.class);
82+
return (AbstractDirtyDataConsumer) constructor.newInstance(type + CLASS_POST_STR);
83+
});
6484
}
6585

6686
public void close() throws InterruptedException {
@@ -82,4 +102,7 @@ public void collectDirtyData(String dataInfo, String cause, String field) throws
82102
consumer.setQueue(queue);
83103
}
84104

105+
private static String upperCaseFirstChar(String str) {
106+
return str.substring(0, 1).toUpperCase() + str.substring(1);
107+
}
85108
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.dtstack.flink.sql.dirtyManager;
2+
3+
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
4+
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
5+
import org.junit.Assert;
6+
import org.junit.Test;
7+
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
11+
/**
12+
* @author tiezhu
13+
* Company dtstack
14+
* Date 2020/8/28 星期五
15+
*/
16+
public class TestDirtyDataManager {
17+
@Test
18+
public void testCreateConsumer() throws Exception {
19+
Map<String, String> properties = new HashMap<>();
20+
properties.put("type", "print");
21+
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQL/sqlplugins");
22+
23+
DirtyDataManager manager = DirtyDataManager.newInstance();
24+
AbstractDirtyDataConsumer consumer = manager.createConsumer(properties);
25+
Assert.assertNotNull(consumer);
26+
}
27+
}

dirtyDataManager/manager/pom.xml

Lines changed: 0 additions & 34 deletions
This file was deleted.

dirtyDataManager/manager/src/main/java/com/dtstack/flink/sql/dirty/classLoader/DirtyManagerClassLoader.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

dirtyDataManager/manager/src/main/java/com/dtstack/flink/sql/dirty/util/ManagerUtil.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)