Skip to content

Commit d7726d4

Browse files
committed
run mode
1 parent 80d3127 commit d7726d4

8 files changed

Lines changed: 177 additions & 34 deletions

File tree

core/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
<calcite.server.version>1.16.0</calcite.server.version>
2121
<jackson.version>2.7.9</jackson.version>
2222
<guava.version>19.0</guava.version>
23+
<logback.classic.version>1.1.7</logback.classic.version>
2324
</properties>
2425

2526
<dependencies>
@@ -117,6 +118,13 @@
117118
<version>2.5</version>
118119
</dependency>
119120

121+
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
122+
<dependency>
123+
<groupId>ch.qos.logback</groupId>
124+
<artifactId>logback-classic</artifactId>
125+
<version>${logback.classic.version}</version>
126+
</dependency>
127+
120128

121129
</dependencies>
122130

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package com.dtstack.flink.sql.launcher;
2121

2222
import com.dtstack.flink.sql.launcher.entity.JobParamsInfo;
23+
import com.dtstack.flink.sql.launcher.executor.StandaloneExecutor;
2324
import com.dtstack.flink.sql.launcher.executor.YarnJobClusterExecutor;
2425
import com.dtstack.flink.sql.launcher.executor.YarnSessionClusterExecutor;
2526
import com.alibaba.fastjson.JSON;
@@ -143,6 +144,8 @@ public static void main(String[] args) throws Exception {
143144
new YarnJobClusterExecutor(jobParamsInfo).exec();
144145
break;
145146
case standalone:
147+
new StandaloneExecutor(jobParamsInfo).exec();
148+
break;
146149
default:
147150
throw new RuntimeException("Unsupported operating mode, please use local,yarn,yarnPer");
148151
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.launcher.executor;
20+
21+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
22+
import com.dtstack.flink.sql.launcher.entity.JobParamsInfo;
23+
import com.dtstack.flink.sql.launcher.factory.StandaloneClientFactory;
24+
import com.dtstack.flink.sql.launcher.utils.JobGraphBuildUtil;
25+
import org.apache.commons.lang3.StringUtils;
26+
import org.apache.flink.api.common.JobExecutionResult;
27+
import org.apache.flink.client.ClientUtils;
28+
import org.apache.flink.client.deployment.ClusterDescriptor;
29+
import org.apache.flink.client.deployment.StandaloneClusterId;
30+
import org.apache.flink.client.program.ClusterClient;
31+
import org.apache.flink.client.program.ClusterClientProvider;
32+
import org.apache.flink.configuration.Configuration;
33+
import org.apache.flink.runtime.jobgraph.JobGraph;
34+
import org.apache.flink.util.Preconditions;
35+
36+
/**
37+
* Date: 2020/3/6
38+
* Company: www.dtstack.com
39+
* @author maqi
40+
*/
41+
public class StandaloneExecutor {
42+
43+
StandaloneClientFactory standaloneClientFactory;
44+
JobParamsInfo jobParamsInfo;
45+
46+
public StandaloneExecutor(JobParamsInfo jobParamsInfo) {
47+
this.jobParamsInfo = jobParamsInfo;
48+
standaloneClientFactory = new StandaloneClientFactory();
49+
}
50+
51+
public void exec() throws Exception {
52+
53+
Preconditions.checkArgument(!StringUtils.equalsIgnoreCase(jobParamsInfo.getPluginLoadMode(), EPluginLoadMode.CLASSPATH.name()),
54+
"standalone only supports classpath mode");
55+
56+
JobGraph jobGraph = JobGraphBuildUtil.buildJobGraph(jobParamsInfo);
57+
Configuration flinkConfiguration = JobGraphBuildUtil.getFlinkConfiguration(jobParamsInfo.getFlinkConfDir(), jobParamsInfo.getConfProperties());
58+
59+
if (!StringUtils.isBlank(jobParamsInfo.getUdfJar())) {
60+
JobGraphBuildUtil.fillUserJarForJobGraph(jobParamsInfo.getUdfJar(), jobGraph);
61+
}
62+
63+
JobGraphBuildUtil.fillJobGraphClassPath(jobGraph);
64+
65+
ClusterDescriptor clusterDescriptor = standaloneClientFactory.createClusterDescriptor("", flinkConfiguration);
66+
ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(StandaloneClusterId.getInstance());
67+
ClusterClient clusterClient = clusterClientProvider.getClusterClient();
68+
69+
70+
JobExecutionResult jobExecutionResult = ClientUtils.submitJob(clusterClient, jobGraph);
71+
String jobID = jobExecutionResult.getJobID().toString();
72+
System.out.println("jobID:" + jobID);
73+
74+
}
75+
76+
77+
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/YarnJobClusterExecutor.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.client.deployment.ClusterSpecification;
2828
import org.apache.flink.client.program.ClusterClientProvider;
2929
import org.apache.flink.configuration.Configuration;
30-
import org.apache.flink.configuration.MemorySize;
3130
import org.apache.flink.configuration.TaskManagerOptions;
3231
import org.apache.flink.runtime.jobgraph.JobGraph;
3332
import org.apache.flink.yarn.YarnClusterDescriptor;
@@ -72,6 +71,7 @@ public void exec() throws Exception {
7271
if (!StringUtils.isBlank(jobParamsInfo.getUdfJar())) {
7372
JobGraphBuildUtil.fillUserJarForJobGraph(jobParamsInfo.getUdfJar(), jobGraph);
7473
}
74+
7575
Configuration flinkConfiguration = JobGraphBuildUtil.getFlinkConfiguration(jobParamsInfo.getFlinkConfDir(), jobParamsInfo.getConfProperties());
7676
appendApplicationConfig(flinkConfiguration, jobParamsInfo);
7777

@@ -134,36 +134,16 @@ private void dealFlinkLibJar(String flinkJarPath, YarnClusterDescriptor clusterD
134134
private void dealUserJarByPluginLoadMode(String pluginLoadMode, JobGraph jobGraph, List<File> shipFiles) throws MalformedURLException {
135135
// classpath , all node need contain plugin jar
136136
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
137-
fillJobGraphClassPath(jobGraph);
137+
JobGraphBuildUtil.fillJobGraphClassPath(jobGraph);
138138
} else if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.SHIPFILE.name())) {
139-
List<File> pluginPaths = getPluginPathToShipFiles(jobGraph);
139+
List<File> pluginPaths = JobGraphBuildUtil.getPluginPathToShipFiles(jobGraph);
140140
shipFiles.addAll(pluginPaths);
141141
} else {
142142
throw new IllegalArgumentException("Unsupported plugin loading mode " + pluginLoadMode
143143
+ " Currently only classpath and shipfile are supported.");
144144
}
145145
}
146146

147-
private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
148-
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
149-
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()) {
150-
if (tmp.getKey().startsWith("class_path")) {
151-
jobGraph.getClasspaths().add(new URL("file:" + tmp.getValue().filePath));
152-
}
153-
}
154-
}
155-
156-
private List<File> getPluginPathToShipFiles(JobGraph jobGraph) {
157-
List<File> shipFiles = new ArrayList<>();
158-
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
159-
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()) {
160-
if (tmp.getKey().startsWith("class_path")) {
161-
shipFiles.add(new File(tmp.getValue().filePath));
162-
}
163-
}
164-
return shipFiles;
165-
}
166-
167147

168148
private Optional<File> discoverLogConfigFile(final String configurationDirectory) {
169149
Optional<File> logConfigFile = Optional.empty();

launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/YarnSessionClusterExecutor.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.launcher.executor;
2020

2121

22+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2223
import com.dtstack.flink.sql.launcher.entity.JobParamsInfo;
2324
import com.dtstack.flink.sql.launcher.factory.YarnClusterClientFactory;
2425
import com.dtstack.flink.sql.launcher.utils.JobGraphBuildUtil;
@@ -63,20 +64,21 @@ public void exec() throws Exception {
6364
ClusterClientProvider<ApplicationId> retrieve = clusterDescriptor.retrieve(applicationId);
6465
ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient();
6566

66-
removeSqlPluginAndFillUdfJar(jobGraph, jobParamsInfo.getUdfJar());
67+
if (StringUtils.equalsIgnoreCase(jobParamsInfo.getPluginLoadMode(), EPluginLoadMode.SHIPFILE.name())) {
68+
jobGraph.getUserArtifacts().clear();
69+
} else {
70+
JobGraphBuildUtil.fillJobGraphClassPath(jobGraph);
71+
}
72+
73+
if (!StringUtils.isEmpty(jobParamsInfo.getUdfJar())) {
74+
JobGraphBuildUtil.fillUserJarForJobGraph(jobParamsInfo.getUdfJar(), jobGraph);
75+
}
6776

6877
JobExecutionResult jobExecutionResult = ClientUtils.submitJob(clusterClient, jobGraph);
6978
String jobID = jobExecutionResult.getJobID().toString();
7079
System.out.println("jobID:" + jobID);
7180

7281
}
7382

74-
private void removeSqlPluginAndFillUdfJar(JobGraph jobGraph, String udfJar) throws UnsupportedEncodingException {
75-
jobGraph.getUserArtifacts().clear();
76-
jobGraph.getUserJars().clear();
77-
if (!StringUtils.isEmpty(udfJar)) {
78-
JobGraphBuildUtil.fillUserJarForJobGraph(udfJar, jobGraph);
79-
}
80-
}
8183

8284
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.launcher.factory;
20+
21+
import org.apache.flink.client.deployment.ClusterDescriptor;
22+
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
23+
import org.apache.flink.configuration.Configuration;
24+
25+
import static org.apache.flink.util.Preconditions.checkNotNull;
26+
27+
/**
28+
* Date: 2020/3/6
29+
* Company: www.dtstack.com
30+
* @author maqi
31+
*/
32+
public class StandaloneClientFactory extends AbstractClusterClientFactory {
33+
@Override
34+
public ClusterDescriptor createClusterDescriptor(String clusterConfPath, Configuration flinkConfig) {
35+
checkNotNull(flinkConfig);
36+
return new StandaloneClusterDescriptor(flinkConfig);
37+
}
38+
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/factory/YarnClusterClientFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,12 @@ public ClusterDescriptor createClusterDescriptor(String yarnConfDir, Configurati
4848
try {
4949
flinkConfig.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
5050
FileSystem.initialize(flinkConfig, null);
51-
SecurityUtils.install(new SecurityConfiguration(flinkConfig));
52-
5351

5452
YarnConfiguration yarnConf = getYarnConf(yarnConfDir);
5553
YarnClient yarnClient = YarnClient.createYarnClient();
5654
yarnClient.init(yarnConf);
5755
yarnClient.start();
5856

59-
6057
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
6158
flinkConfig,
6259
yarnConf,

launcher/src/main/java/com/dtstack/flink/sql/launcher/utils/JobGraphBuildUtil.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import com.dtstack.flink.sql.launcher.entity.JobParamsInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
2424
import com.dtstack.flink.sql.util.PluginUtil;
25+
import com.google.common.collect.Lists;
2526
import org.apache.commons.io.Charsets;
2627
import org.apache.commons.lang.BooleanUtils;
2728
import org.apache.commons.lang.StringUtils;
29+
import org.apache.commons.lang.exception.ExceptionUtils;
30+
import org.apache.flink.api.common.cache.DistributedCache;
2831
import org.apache.flink.client.program.PackagedProgram;
2932
import org.apache.flink.client.program.PackagedProgramUtils;
3033
import org.apache.flink.configuration.Configuration;
@@ -35,10 +38,17 @@
3538

3639
import java.io.File;
3740
import java.io.UnsupportedEncodingException;
41+
import java.net.MalformedURLException;
42+
import java.net.URL;
3843
import java.net.URLDecoder;
44+
import java.util.ArrayList;
3945
import java.util.Arrays;
46+
import java.util.Collections;
4047
import java.util.List;
48+
import java.util.Map;
49+
import java.util.Optional;
4150
import java.util.Properties;
51+
import java.util.stream.Collectors;
4252

4353
/**
4454
* build JobGraph by JobParamsInfo
@@ -102,4 +112,32 @@ public static void fillUserJarForJobGraph(String jarPath, JobGraph jobGraph) thr
102112
List<String> paths = Arrays.asList(addjarPath.split(","));
103113
paths.forEach(path -> jobGraph.addJar(new Path("file://" + path)));
104114
}
115+
116+
public static void fillJobGraphClassPath(JobGraph jobGraph) {
117+
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
118+
List<URL> classPath = jobCacheFileConfig.entrySet().stream()
119+
.filter(tmp -> tmp.getKey().startsWith("class_path"))
120+
.map(tmp -> {
121+
try {
122+
return new URL("file:" + tmp.getValue().filePath);
123+
} catch (MalformedURLException e) {
124+
throw new RuntimeException(ExceptionUtils.getFullStackTrace(e));
125+
}
126+
})
127+
.collect(Collectors.toList());
128+
jobGraph.getUserArtifacts().clear();
129+
jobGraph.setClasspaths(classPath);
130+
}
131+
132+
public static List<File> getPluginPathToShipFiles(JobGraph jobGraph) {
133+
List<File> shipFiles = new ArrayList<>();
134+
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
135+
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()) {
136+
if (tmp.getKey().startsWith("class_path")) {
137+
shipFiles.add(new File(tmp.getValue().filePath));
138+
}
139+
}
140+
jobGraph.getUserArtifacts().clear();
141+
return shipFiles;
142+
}
105143
}

0 commit comments

Comments
 (0)