Skip to content

Commit 954246c

Browse files
committed
【fix】add LocalTest PluginLoadMode
1 parent 557f350 commit 954246c

4 files changed

Lines changed: 65 additions & 47 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/enums/EPluginLoadMode.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@ public enum EPluginLoadMode {
3333
/**
3434
* 1:shipfile
3535
*/
36-
SHIPFILE(1);
36+
SHIPFILE(1),
37+
38+
/**
39+
* 2:localTest
40+
*/
41+
LOCALTEST(3);
3742

3843
private int type;
3944

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import com.fasterxml.jackson.core.JsonParseException;
2828
import com.fasterxml.jackson.databind.JsonMappingException;
2929
import com.fasterxml.jackson.databind.ObjectMapper;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032

3133
import java.io.ByteArrayInputStream;
3234
import java.io.File;
33-
import java.io.FilenameFilter;
3435
import java.io.IOException;
3536
import java.net.MalformedURLException;
3637
import java.net.URL;
38+
import java.nio.charset.StandardCharsets;
3739
import java.util.ArrayList;
3840
import java.util.List;
3941
import java.util.Map;
@@ -54,35 +56,51 @@ public class PluginUtil {
5456

5557
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql";
5658

59+
private static final Logger LOG = LoggerFactory.getLogger(PluginUtil.class);
60+
5761
private static ObjectMapper objectMapper = new ObjectMapper();
5862

5963
public static URL buildSourceAndSinkPathByLoadMode(String type, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
6064
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
61-
return getRemoteJarFilePath(type, suffix, remoteSqlPluginPath, localSqlPluginPath);
65+
return getRemoteJarFilePath(type, suffix, remoteSqlPluginPath, localSqlPluginPath, pluginLoadMode);
6266
}
63-
return getLocalJarFilePath(type, suffix, localSqlPluginPath);
67+
return getLocalJarFilePath(type, suffix, localSqlPluginPath, pluginLoadMode);
6468
}
6569

6670
public static URL buildSidePathByLoadMode(String type, String operator, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
6771
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
68-
return getRemoteSideJarFilePath(type, operator, suffix, remoteSqlPluginPath, localSqlPluginPath);
72+
return getRemoteSideJarFilePath(type, operator, suffix, remoteSqlPluginPath, localSqlPluginPath, pluginLoadMode);
6973
}
70-
return getLocalSideJarFilePath(type, operator, suffix, localSqlPluginPath);
74+
return getLocalSideJarFilePath(type, operator, suffix, localSqlPluginPath, pluginLoadMode);
7175
}
7276

7377
public static String getJarFileDirPath(String type, String sqlRootDir){
7478
String jarPath = sqlRootDir + SP + type;
75-
File jarFile = new File(jarPath);
7679

77-
// if(!jarFile.exists()){
78-
// throw new RuntimeException(String.format("path %s not exists!!!", jarPath));
79-
// }
80+
checkJarFileDirPath(sqlRootDir, jarPath);
8081

8182
return jarPath;
8283
}
8384

8485
public static String getSideJarFileDirPath(String pluginType, String sideOperator, String tableType, String sqlRootDir) throws MalformedURLException {
85-
return sqlRootDir + SP + pluginType + sideOperator + tableType.toLowerCase();
86+
String dirName = sqlRootDir + SP + pluginType + sideOperator + tableType.toLowerCase();
87+
88+
checkJarFileDirPath(sqlRootDir, dirName);
89+
90+
return dirName;
91+
}
92+
93+
private static void checkJarFileDirPath(String sqlRootDir, String dirName) {
94+
if (sqlRootDir == null || sqlRootDir.isEmpty()) {
95+
LOG.warn("be sure you are not in LocalTest mode, if not, check the sqlRootDir");
96+
return;
97+
}
98+
99+
File jarFile = new File(dirName);
100+
101+
if(!jarFile.exists()){
102+
throw new RuntimeException(String.format("path %s not exists!!!", dirName));
103+
}
86104
}
87105

88106
public static String getGenerClassName(String pluginTypeName, String type) throws IOException {
@@ -112,40 +130,40 @@ public static <T> T jsonStrToObject(String jsonStr, Class<T> clazz) throws JsonP
112130

113131
public static Properties stringToProperties(String str) throws IOException{
114132
Properties properties = new Properties();
115-
properties.load(new ByteArrayInputStream(str.getBytes("UTF-8")));
133+
properties.load(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
116134
return properties;
117135
}
118136

119-
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
120-
return buildFinalJarFilePath(pluginType, tableType, remoteSqlRootDir, localSqlPluginPath);
137+
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath, String pluginLoadMode) throws Exception {
138+
return buildFinalJarFilePath(pluginType, tableType, remoteSqlRootDir, localSqlPluginPath, pluginLoadMode);
121139
}
122140

123-
public static URL getLocalJarFilePath(String pluginType, String tableType, String localSqlPluginPath) throws Exception {
124-
return buildFinalJarFilePath(pluginType, tableType, null, localSqlPluginPath);
141+
public static URL getLocalJarFilePath(String pluginType, String tableType, String localSqlPluginPath, String pluginLoadMode) throws Exception {
142+
return buildFinalJarFilePath(pluginType, tableType, null, localSqlPluginPath, pluginLoadMode);
125143
}
126144

127-
public static URL buildFinalJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
145+
public static URL buildFinalJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath, String pluginLoadMode) throws Exception {
128146
String dirName = pluginType + tableType.toLowerCase();
129147
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
130148
String jarPath = localSqlPluginPath + SP + dirName;
131-
String jarName = getCoreJarFileName(jarPath, prefix);
149+
String jarName = getCoreJarFileName(jarPath, prefix, pluginLoadMode);
132150
String sqlRootDir = remoteSqlRootDir == null ? localSqlPluginPath : remoteSqlRootDir;
133151
return new URL("file:" + sqlRootDir + SP + dirName + SP + jarName);
134152
}
135153

136-
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
137-
return buildFinalSideJarFilePath(pluginType, sideOperator, tableType, remoteSqlRootDir, localSqlPluginPath);
154+
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath, String pluginLoadMode) throws Exception {
155+
return buildFinalSideJarFilePath(pluginType, sideOperator, tableType, remoteSqlRootDir, localSqlPluginPath, pluginLoadMode);
138156
}
139157

140-
public static URL getLocalSideJarFilePath(String pluginType, String sideOperator, String tableType, String localSqlPluginPath) throws Exception {
141-
return buildFinalSideJarFilePath(pluginType, sideOperator, tableType, null, localSqlPluginPath);
158+
public static URL getLocalSideJarFilePath(String pluginType, String sideOperator, String tableType, String localSqlPluginPath, String pluginLoadMode) throws Exception {
159+
return buildFinalSideJarFilePath(pluginType, sideOperator, tableType, null, localSqlPluginPath, pluginLoadMode);
142160
}
143161

144-
public static URL buildFinalSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
162+
public static URL buildFinalSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath, String pluginLoadMode) throws Exception {
145163
String dirName = pluginType + sideOperator + tableType.toLowerCase();
146164
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
147165
String jarPath = localSqlPluginPath + SP + dirName;
148-
String jarName = getCoreJarFileName(jarPath, prefix);
166+
String jarName = getCoreJarFileName(jarPath, prefix, pluginLoadMode);
149167
String sqlRootDir = remoteSqlRootDir == null ? localSqlPluginPath : remoteSqlRootDir;
150168
return new URL("file:" + sqlRootDir + SP + dirName + SP + jarName);
151169
}
@@ -154,8 +172,15 @@ public static String upperCaseFirstChar(String str){
154172
return str.substring(0, 1).toUpperCase() + str.substring(1);
155173
}
156174

157-
public static void addPluginJar(String pluginDir, DtClassLoader classLoader) throws MalformedURLException {
175+
public static URL[] getPluginJarUrls(String pluginDir) throws MalformedURLException {
176+
List<URL> urlList = new ArrayList<>();
177+
158178
File dirFile = new File(pluginDir);
179+
180+
if (pluginDir.contains("null")) {
181+
return urlList.toArray(new URL[0]);
182+
}
183+
159184
if(!dirFile.exists() || !dirFile.isDirectory()){
160185
throw new RuntimeException("plugin path:" + pluginDir + "is not exist.");
161186
}
@@ -165,22 +190,6 @@ public static void addPluginJar(String pluginDir, DtClassLoader classLoader) thr
165190
throw new RuntimeException("plugin path:" + pluginDir + " is null.");
166191
}
167192

168-
for(File file : files){
169-
URL pluginJarUrl = file.toURI().toURL();
170-
classLoader.addURL(pluginJarUrl);
171-
}
172-
}
173-
174-
public static URL[] getPluginJarUrls(String pluginDir) throws MalformedURLException {
175-
List<URL> urlList = new ArrayList<>();
176-
File dirFile = new File(pluginDir);
177-
178-
File[] files = dirFile.listFiles(tmpFile -> tmpFile.isFile() && tmpFile.getName().endsWith(JAR_SUFFIX));
179-
180-
if (files == null || files.length == 0) {
181-
return urlList.toArray(new URL[0]);
182-
}
183-
184193
for(File file : files){
185194
URL pluginJarUrl = file.toURI().toURL();
186195
urlList.add(pluginJarUrl);
@@ -189,7 +198,7 @@ public static URL[] getPluginJarUrls(String pluginDir) throws MalformedURLExcept
189198
return urlList.toArray(new URL[0]);
190199
}
191200

192-
public static String getCoreJarFileName (String path, String prefix) throws Exception {
201+
public static String getCoreJarFileName(String path, String prefix, String pluginLoadMode) throws Exception {
193202
String coreJarFileName = null;
194203
File pluginDir = new File(path);
195204
if (pluginDir.exists() && pluginDir.isDirectory()){
@@ -201,6 +210,10 @@ public static String getCoreJarFileName (String path, String prefix) throws Exce
201210
}
202211
}
203212

213+
if (StringUtils.isEmpty(coreJarFileName) && !pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name())){
214+
throw new Exception("Can not find core jar file in path:" + path);
215+
}
216+
204217
return coreJarFileName;
205218
}
206219
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/utils/JobGraphBuildUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public class JobGraphBuildUtil {
5656
private static final String SP = File.separator;
5757
private static final String CORE_JAR = "core";
5858

59-
private static String findLocalCoreJarPath(String localSqlRootJar) throws Exception {
60-
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
59+
private static String findLocalCoreJarPath(String localSqlRootJar, String pluginLoadMode) throws Exception {
60+
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR, pluginLoadMode);
6161
return localSqlRootJar + SP + jarPath;
6262
}
6363

@@ -67,7 +67,7 @@ public static JobGraph buildJobGraph(JobParamsInfo jobParamsInfo) throws Excepti
6767
String flinkConfDir = jobParamsInfo.getFlinkConfDir();
6868

6969
String[] execArgs = jobParamsInfo.getExecArgs();
70-
File coreJarFile = new File(findLocalCoreJarPath(jobParamsInfo.getLocalPluginRoot()));
70+
File coreJarFile = new File(findLocalCoreJarPath(jobParamsInfo.getLocalPluginRoot(), jobParamsInfo.getPluginLoadMode()));
7171
SavepointRestoreSettings savepointRestoreSettings = dealSavepointRestoreSettings(jobParamsInfo.getConfProperties());
7272

7373
PackagedProgram program = PackagedProgram.newBuilder()

localTest/src/main/java/com/dtstack/flink/sql/localTest/LocalTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ public static void main(String[] args) throws Exception {
3333

3434
//其他参数配置
3535
properties.put("time.characteristic", "eventTime");
36-
36+
3737
// 任务配置参数
3838
conf.put("-sql", URLEncoder.encode(readSQL(sqlPath), StandardCharsets.UTF_8.name()));
3939
conf.put("-mode", "local");
4040
conf.put("-name", "flinkStreamSQLLocalTest");
4141
conf.put("-confProp", properties.toString());
42-
conf.put("-pluginLoadMode", "CLASSPATH");
42+
conf.put("-pluginLoadMode", "LocalTest");
4343

4444
for (Map.Entry<String, Object> keyValue : conf.entrySet()) {
4545
propertiesList.add(keyValue.getKey());

0 commit comments

Comments
 (0)