Skip to content

Commit 70da6a7

Browse files
WTZ468071157WTZ468071157
authored andcommitted
[feat] add addShipfile
1 parent 2c1186c commit 70da6a7

5 files changed

Lines changed: 55 additions & 60 deletions

File tree

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public static JobParamsInfo parseArgs(String[] args) throws Exception {
7171
String udfJar = launcherOptions.getAddjar();
7272
String queue = launcherOptions.getQueue();
7373
String pluginLoadMode = launcherOptions.getPluginLoadMode();
74+
String addShipfile = launcherOptions.getAddShipfile();
7475

7576
String yarnSessionConf = URLDecoder.decode(launcherOptions.getYarnSessionConf(), Charsets.UTF_8.toString());
7677
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
@@ -91,6 +92,7 @@ public static JobParamsInfo parseArgs(String[] args) throws Exception {
9192
.setFlinkJarPath(flinkJarPath)
9293
.setPluginLoadMode(pluginLoadMode)
9394
.setQueue(queue)
95+
.setAddShipfile(addShipfile)
9496
.build();
9597
}
9698

launcher/src/main/java/com/dtstack/flink/sql/launcher/entity/JobParamsInfo.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,31 @@
2222
import java.util.Properties;
2323

2424
/**
25-
* parse the original mission parameters
25+
* parse the original mission parameters
2626
* Date: 2020/3/4
2727
* Company: www.dtstack.com
28+
*
2829
* @author maqi
2930
*/
3031
public class JobParamsInfo {
3132

32-
private String mode;
33-
private String name;
34-
private String queue;
35-
private String localPluginRoot;
36-
private String flinkConfDir;
37-
private String flinkJarPath;
38-
private String yarnConfDir;
39-
private String pluginLoadMode;
40-
private String udfJar;
41-
private String[] execArgs;
42-
private Properties confProperties;
43-
private Properties yarnSessionConfProperties;
33+
private final String mode;
34+
private final String name;
35+
private final String queue;
36+
private final String localPluginRoot;
37+
private final String flinkConfDir;
38+
private final String flinkJarPath;
39+
private final String yarnConfDir;
40+
private final String pluginLoadMode;
41+
private final String udfJar;
42+
private final String[] execArgs;
43+
private final Properties confProperties;
44+
private final Properties yarnSessionConfProperties;
45+
private final String addShipFile;
4446

4547
private JobParamsInfo(String mode, String name, String queue, String localPluginRoot, String flinkConfDir, String yarnConfDir,
4648
String pluginLoadMode, String[] execArgs, Properties confProperties, Properties yarnSessionConfProperties,
47-
String udfJar, String flinkJarPath) {
49+
String udfJar, String flinkJarPath, String addShipFile) {
4850
this.mode = mode;
4951
this.name = name;
5052
this.queue = queue;
@@ -57,6 +59,7 @@ private JobParamsInfo(String mode, String name, String queue, String localPlugin
5759
this.yarnSessionConfProperties = yarnSessionConfProperties;
5860
this.udfJar = udfJar;
5961
this.flinkJarPath = flinkJarPath;
62+
this.addShipFile = addShipFile;
6063
}
6164

6265
public String getMode() {
@@ -107,6 +110,10 @@ public String getFlinkJarPath() {
107110
return flinkJarPath;
108111
}
109112

113+
public String getAddShipFile() {
114+
return addShipFile;
115+
}
116+
110117
public static JobParamsInfo.Builder builder() {
111118
return new JobParamsInfo.Builder();
112119
}
@@ -125,6 +132,7 @@ public static class Builder {
125132
private String udfJar;
126133
private Properties confProperties;
127134
private Properties yarnSessionConfProperties;
135+
private String addShipfile;
128136

129137
public JobParamsInfo.Builder setMode(String mode) {
130138
this.mode = mode;
@@ -186,9 +194,15 @@ public JobParamsInfo.Builder setFlinkJarPath(String flinkJarPath) {
186194
return this;
187195
}
188196

197+
public JobParamsInfo.Builder setAddShipfile(String addShipfile) {
198+
this.addShipfile = addShipfile;
199+
return this;
200+
}
201+
189202
public JobParamsInfo build() {
190203
return new JobParamsInfo(mode, name, queue, localPluginRoot, flinkConfDir,
191-
yarnConfDir, pluginLoadMode, execArgs, confProperties, yarnSessionConfProperties, udfJar, flinkJarPath);
204+
yarnConfDir, pluginLoadMode, execArgs, confProperties,
205+
yarnSessionConfProperties, udfJar, flinkJarPath, addShipfile);
192206
}
193207
}
194208

@@ -207,6 +221,7 @@ public String toString() {
207221
", execArgs=" + Arrays.toString(execArgs) +
208222
", confProperties=" + confProperties +
209223
", yarnSessionConfProperties=" + yarnSessionConfProperties +
224+
", addShipFile='" + addShipFile + '\'' +
210225
'}';
211226
}
212227
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/YarnJobClusterExecutor.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.io.File;
4040
import java.net.MalformedURLException;
4141
import java.util.ArrayList;
42+
import java.util.Arrays;
4243
import java.util.List;
4344
import java.util.Optional;
4445

@@ -74,15 +75,23 @@ public void exec() throws Exception {
7475
.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);
7576

7677
List<File> shipFiles = getShipFiles(jobParamsInfo.getFlinkJarPath(), jobParamsInfo.getPluginLoadMode(), jobGraph, clusterDescriptor);
78+
79+
if (jobParamsInfo.getAddShipFile() != null) {
80+
List<String> addShipFilesPath = parsePathFromStr(jobParamsInfo.getAddShipFile());
81+
for (String path : addShipFilesPath) {
82+
shipFiles.addAll(getShipFiles(path, jobParamsInfo.getPluginLoadMode(), jobGraph, clusterDescriptor));
83+
}
84+
}
85+
7786
clusterDescriptor.addShipFiles(shipFiles);
7887

7988
ClusterSpecification clusterSpecification = YarnClusterClientFactory.INSTANCE.getClusterSpecification(flinkConfiguration);
8089
ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
8190

8291
String applicationId = applicationIdClusterClientProvider.getClusterClient().getClusterId().toString();
8392
String flinkJobId = jobGraph.getJobID().toString();
84-
String tips = String.format("deploy per_job with appId: %s, jobId: %s", applicationId, flinkJobId);
85-
System.out.println(tips);
93+
94+
LOG.info(String.format("deploy per_job with appId: %s, jobId: %s", applicationId, flinkJobId));
8695
}
8796

8897
private void appendApplicationConfig(Configuration flinkConfig, JobParamsInfo jobParamsInfo) {
@@ -115,9 +124,14 @@ protected List<File> getShipFiles(String flinkJarPath, String pluginLoadMode, Jo
115124

116125
private void dealFlinkLibJar(String flinkJarPath, YarnClusterDescriptor clusterDescriptor, List<File> shipFiles) throws MalformedURLException {
117126
if (StringUtils.isEmpty(flinkJarPath) || !new File(flinkJarPath).exists()) {
118-
throw new RuntimeException("The param '-flinkJarPath' ref dir is not exist");
127+
throw new RuntimeException("path " + flinkJarPath + " is not exist");
119128
}
120129
File[] jars = new File(flinkJarPath).listFiles();
130+
131+
if (jars == null || jars.length == 0) {
132+
throw new RuntimeException(flinkJarPath + " no file exist !");
133+
}
134+
121135
for (File file : jars) {
122136
if (file.toURI().toURL().toString().contains("flink-dist")) {
123137
clusterDescriptor.setLocalJarPath(new Path(file.toURI().toURL().toString()));
@@ -161,5 +175,11 @@ private Optional<File> discoverLogConfigFile(final String configurationDirectory
161175
return logConfigFile;
162176
}
163177

178+
private static List<String> parsePathFromStr(String pathStr) {
179+
if (pathStr.length() > 2) {
180+
pathStr = pathStr.substring(1, pathStr.length() - 1).replace("\"", "");
181+
}
164182

183+
return Arrays.asList(pathStr.split(","));
184+
}
165185
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java

Lines changed: 0 additions & 42 deletions
This file was deleted.

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Whitespace-only changes.

0 commit comments

Comments
 (0)