Skip to content

Commit b16a9fd

Browse files
committed
【feat】add LocalTest and TestLocalTest
1 parent 622b4b8 commit b16a9fd

13 files changed

Lines changed: 240 additions & 63 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
6565
LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
6666
return classLoader;
6767
} catch (Throwable e) {
68-
LOG.error("retrieve ClassLoad happens error:{}", e);
68+
LOG.error("retrieve ClassLoad happens error", e);
6969
throw new RuntimeException("retrieve ClassLoad happens error");
7070
}
7171
});

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,15 @@
2525
import com.dtstack.flink.sql.parser.SqlParser;
2626
import com.dtstack.flink.sql.parser.SqlTree;
2727
import org.apache.flink.api.common.typeinfo.TypeInformation;
28-
import org.apache.flink.api.java.tuple.Tuple2;
2928
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3029
import org.apache.flink.streaming.api.datastream.DataStream;
3130
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3231
import org.apache.flink.table.api.*;
33-
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
3432
import org.apache.flink.table.api.java.StreamTableEnvironment;
3533
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
3634
import org.apache.flink.table.sinks.TableSink;
37-
import org.apache.flink.types.Row;
3835

3936
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
40-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
4137
import com.dtstack.flink.sql.enums.ClusterMode;
4238
import com.dtstack.flink.sql.enums.ECacheType;
4339
import com.dtstack.flink.sql.enums.EPluginLoadMode;
@@ -108,7 +104,6 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
108104
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
109105
String pluginLoadMode = options.getPluginLoadMode();
110106
String deployMode = options.getMode();
111-
String logLevel = options.getLogLevel();
112107

113108
Preconditions.checkArgument(checkRemoteSqlPluginPath(remoteSqlPluginPath, deployMode, pluginLoadMode),
114109
"Non-local mode or shipfile deployment mode, remoteSqlPluginPath is required");
@@ -324,6 +319,9 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
324319
throw new RuntimeException("not support table type:" + tableInfo.getType());
325320
}
326321
}
322+
if (localSqlPluginPath == null || localSqlPluginPath.isEmpty()) {
323+
return Sets.newHashSet();
324+
}
327325
return pluginClassPathSets;
328326
}
329327

@@ -351,7 +349,7 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
351349
}
352350

353351

354-
private static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironment env, Properties confProperties) {
352+
public static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironment env, Properties confProperties) {
355353
// use blink and streammode
356354
EnvironmentSettings settings = EnvironmentSettings.newInstance()
357355
.useBlinkPlanner()

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,8 @@ public static class Builder {
114114
private String remoteSqlPluginPath;
115115
private String pluginLoadMode;
116116
private String deployMode;
117-
private String logLevel;
118117
private Properties confProp;
119118

120-
121119
public ParamsInfo.Builder setSql(String sql) {
122120
this.sql = sql;
123121
return this;

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ public class Options {
4545
@OptionRequired(description = "Yarn and Hadoop configuration directory")
4646
private String yarnconf;
4747

48-
@OptionRequired(required = true,description = "Sql local plugin root")
48+
@OptionRequired(description = "Sql local plugin root")
4949
private String localSqlPluginPath;
5050

51-
@OptionRequired(required = false,description = "Sql remote plugin root")
51+
@OptionRequired(description = "Sql remote plugin root")
5252
private String remoteSqlPluginPath ;
5353

5454
@OptionRequired(description = "sql ext jar,eg udf jar")
@@ -69,9 +69,6 @@ public class Options {
6969
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
7070
private String pluginLoadMode = EPluginLoadMode.CLASSPATH.name();
7171

72-
@OptionRequired(description = "log level")
73-
private String logLevel = "info";
74-
7572
@OptionRequired(description = "file add to ship file")
7673
private String addShipfile;
7774

@@ -180,14 +177,6 @@ public void setPluginLoadMode(String pluginLoadMode) {
180177
this.pluginLoadMode = pluginLoadMode;
181178
}
182179

183-
public String getLogLevel() {
184-
return logLevel;
185-
}
186-
187-
public void setLogLevel(String logLevel) {
188-
this.logLevel = logLevel;
189-
}
190-
191180
public String getAddShipfile() {
192181
return addShipfile;
193182
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ private Map parseProp(String propsStr){
7474
propsStr = propsStr.replaceAll("'\\s*,", "'|");
7575
String[] strs = propsStr.trim().split("\\|");
7676
Map<String, Object> propMap = Maps.newHashMap();
77-
for(int i=0; i<strs.length; i++){
78-
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
77+
for (String str : strs) {
78+
List<String> ss = DtStringUtil.splitIgnoreQuota(str, '=');
7979
String key = ss.get(0).trim();
8080
String value = extractValue(ss.get(1).trim());
8181
propMap.put(key, value);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,6 @@ public void exec(String sql,
9797
Map<String, Table> tableCache,
9898
CreateTmpTableParser.SqlParserResult createView,
9999
String scope) throws Exception {
100-
if(localSqlPluginPath == null){
101-
throw new RuntimeException("need to set localSqlPluginPath");
102-
}
103-
104100
localTableCache.putAll(tableCache);
105101
try {
106102
sidePredicatesParser.fillPredicatesForSideTable(sql, sideTableMap);

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,15 @@ public static String getJarFileDirPath(String type, String sqlRootDir){
7474
String jarPath = sqlRootDir + SP + type;
7575
File jarFile = new File(jarPath);
7676

77-
if(!jarFile.exists()){
78-
throw new RuntimeException(String.format("path %s not exists!!!", jarPath));
79-
}
77+
// if(!jarFile.exists()){
78+
// throw new RuntimeException(String.format("path %s not exists!!!", jarPath));
79+
// }
8080

8181
return jarPath;
8282
}
8383

8484
public static String getSideJarFileDirPath(String pluginType, String sideOperator, String tableType, String sqlRootDir) throws MalformedURLException {
85-
String dirName = sqlRootDir + SP + pluginType + sideOperator + tableType.toLowerCase();
86-
File jarFile = new File(dirName);
87-
88-
if(!jarFile.exists()){
89-
throw new RuntimeException(String.format("path %s not exists!!!", dirName));
90-
}
91-
92-
return dirName;
85+
return sqlRootDir + SP + pluginType + sideOperator + tableType.toLowerCase();
9386
}
9487

9588
public static String getGenerClassName(String pluginTypeName, String type) throws IOException {
@@ -181,42 +174,33 @@ public static void addPluginJar(String pluginDir, DtClassLoader classLoader) thr
181174
public static URL[] getPluginJarUrls(String pluginDir) throws MalformedURLException {
182175
List<URL> urlList = new ArrayList<>();
183176
File dirFile = new File(pluginDir);
184-
if(!dirFile.exists() || !dirFile.isDirectory()){
185-
throw new RuntimeException("plugin path:" + pluginDir + "is not exist.");
186-
}
187177

188178
File[] files = dirFile.listFiles(tmpFile -> tmpFile.isFile() && tmpFile.getName().endsWith(JAR_SUFFIX));
189-
if(files == null || files.length == 0){
190-
throw new RuntimeException("plugin path:" + pluginDir + " is null.");
179+
180+
if (files == null || files.length == 0) {
181+
return urlList.toArray(new URL[0]);
191182
}
192183

193184
for(File file : files){
194185
URL pluginJarUrl = file.toURI().toURL();
195186
urlList.add(pluginJarUrl);
196187
}
197-
return urlList.toArray(new URL[urlList.size()]);
188+
189+
return urlList.toArray(new URL[0]);
198190
}
199191

200192
public static String getCoreJarFileName (String path, String prefix) throws Exception {
201193
String coreJarFileName = null;
202194
File pluginDir = new File(path);
203195
if (pluginDir.exists() && pluginDir.isDirectory()){
204-
File[] jarFiles = pluginDir.listFiles(new FilenameFilter() {
205-
@Override
206-
public boolean accept(File dir, String name) {
207-
return name.toLowerCase().startsWith(prefix) && name.toLowerCase().endsWith(".jar");
208-
}
209-
});
196+
File[] jarFiles = pluginDir.listFiles((dir, name) ->
197+
name.toLowerCase().startsWith(prefix) && name.toLowerCase().endsWith(".jar"));
210198

211199
if (jarFiles != null && jarFiles.length > 0){
212200
coreJarFileName = jarFiles[0].getName();
213201
}
214202
}
215203

216-
if (StringUtils.isEmpty(coreJarFileName)){
217-
throw new Exception("Can not find core jar file in path:" + path);
218-
}
219-
220204
return coreJarFileName;
221205
}
222206
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.IOException;
3838
import java.io.InputStreamReader;
3939
import java.net.URLDecoder;
40+
import java.nio.charset.StandardCharsets;
4041
import java.util.LinkedList;
4142
import java.util.List;
4243
import java.util.Map;
@@ -59,7 +60,7 @@ public static JobParamsInfo parseArgs(String[] args) throws Exception {
5960
OptionParser optionParser = new OptionParser(args);
6061
Options launcherOptions = optionParser.getOptions();
6162
List<String> programExeArgList = optionParser.getProgramExeArgList();
62-
String[] execArgs = programExeArgList.toArray(new String[programExeArgList.size()]);
63+
String[] execArgs = programExeArgList.toArray(new String[0]);
6364

6465
String name = launcherOptions.getName();
6566
String mode = launcherOptions.getMode();
@@ -95,14 +96,14 @@ public static JobParamsInfo parseArgs(String[] args) throws Exception {
9596

9697
private static String[] parseJson(String[] args) {
9798
BufferedReader reader = null;
98-
String lastStr = "";
99+
StringBuilder lastStr = new StringBuilder();
99100
try {
100101
FileInputStream fileInputStream = new FileInputStream(args[0]);
101-
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, "UTF-8");
102+
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, StandardCharsets.UTF_8);
102103
reader = new BufferedReader(inputStreamReader);
103-
String tempString = null;
104+
String tempString;
104105
while ((tempString = reader.readLine()) != null) {
105-
lastStr += tempString;
106+
lastStr.append(tempString);
106107
}
107108
reader.close();
108109
} catch (IOException e) {
@@ -116,16 +117,15 @@ private static String[] parseJson(String[] args) {
116117
}
117118
}
118119
}
119-
Map<String, Object> map = JSON.parseObject(lastStr, new TypeReference<Map<String, Object>>() {
120+
Map<String, Object> map = JSON.parseObject(lastStr.toString(), new TypeReference<Map<String, Object>>() {
120121
});
121122
List<String> list = new LinkedList<>();
122123

123124
for (Map.Entry<String, Object> entry : map.entrySet()) {
124125
list.add("-" + entry.getKey());
125126
list.add(entry.getValue().toString());
126127
}
127-
String[] array = list.toArray(new String[list.size()]);
128-
return array;
128+
return list.toArray(new String[0]);
129129
}
130130

131131

localTest/pom.xml

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>flink.sql</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.localTest</artifactId>
13+
<packaging>pom</packaging>
14+
15+
<properties>
16+
<sql.core.version>1.0-SNAPSHOT</sql.core.version>
17+
</properties>
18+
19+
<dependencies>
20+
21+
<dependency>
22+
<groupId>org.slf4j</groupId>
23+
<artifactId>slf4j-api</artifactId>
24+
<version>1.6.3</version>
25+
<scope>compile</scope>
26+
</dependency>
27+
28+
<dependency>
29+
<groupId>ch.qos.logback</groupId>
30+
<artifactId>logback-classic</artifactId>
31+
<version>1.1.7</version>
32+
<scope>compile</scope>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>junit</groupId>
37+
<artifactId>junit</artifactId>
38+
<version>4.12</version>
39+
<scope>test</scope>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>com.dtstack.flink</groupId>
44+
<artifactId>sql.core</artifactId>
45+
<version>${sql.core.version}</version>
46+
</dependency>
47+
48+
<dependency>
49+
<groupId>com.dtstack.flink</groupId>
50+
<artifactId>sql.source.kafka-base</artifactId>
51+
<version>1.0-SNAPSHOT</version>
52+
</dependency>
53+
54+
<!-- <dependency>-->
55+
<!-- <groupId>com.dtstack.flink</groupId>-->
56+
<!-- <artifactId>sql.source.kafka</artifactId>-->
57+
<!-- <version>1.0-SNAPSHOT</version>-->
58+
<!-- </dependency>-->
59+
60+
<!-- <dependency>-->
61+
<!-- <groupId>com.dtstack.flink</groupId>-->
62+
<!-- <artifactId>sql.source.kafka09</artifactId>-->
63+
<!-- <version>1.0-SNAPSHOT</version>-->
64+
<!-- </dependency>-->
65+
66+
<!-- <dependency>-->
67+
<!-- <groupId>com.dtstack.flink</groupId>-->
68+
<!-- <artifactId>sql.source.kafka10</artifactId>-->
69+
<!-- <version>1.0-SNAPSHOT</version>-->
70+
<!-- </dependency>-->
71+
72+
<dependency>
73+
<groupId>com.dtstack.flink</groupId>
74+
<artifactId>sql.source.kafka11</artifactId>
75+
<version>1.0-SNAPSHOT</version>
76+
</dependency>
77+
78+
<dependency>
79+
<groupId>com.dtstack.flink</groupId>
80+
<artifactId>sql.mysql</artifactId>
81+
<version>1.0-SNAPSHOT</version>
82+
</dependency>
83+
84+
<dependency>
85+
<groupId>com.dtstack.flink</groupId>
86+
<artifactId>sql.side.all.mysql</artifactId>
87+
<version>1.0-SNAPSHOT</version>
88+
</dependency>
89+
90+
<dependency>
91+
<groupId>com.dtstack.flink</groupId>
92+
<artifactId>sql.sink.mysql</artifactId>
93+
<version>1.0-SNAPSHOT</version>
94+
</dependency>
95+
96+
<dependency>
97+
<groupId>com.dtstack.flink</groupId>
98+
<artifactId>sql.side.async.mysql</artifactId>
99+
<version>1.0-SNAPSHOT</version>
100+
</dependency>
101+
102+
<dependency>
103+
<groupId>com.dtstack.flink</groupId>
104+
<artifactId>sql.console</artifactId>
105+
<version>1.0-SNAPSHOT</version>
106+
</dependency>
107+
108+
<dependency>
109+
<groupId>com.dtstack.flink</groupId>
110+
<artifactId>console-sink</artifactId>
111+
<version>1.0-SNAPSHOT</version>
112+
</dependency>
113+
114+
<dependency>
115+
<groupId>com.alibaba</groupId>
116+
<artifactId>fastjson</artifactId>
117+
<version>1.2.29</version>
118+
<scope>compile</scope>
119+
</dependency>
120+
121+
</dependencies>
122+
123+
</project>

0 commit comments

Comments
 (0)