Skip to content

Commit 12c7ea3

Browse files
committed
添加对mode 的注释
修改参数未配置提示
1 parent 9c62b3f commit 12c7ea3

2 files changed

Lines changed: 24 additions & 22 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/enums/ClusterMode.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,14 @@
2424
*/
2525
public enum ClusterMode {
2626

27-
local(0),standalone(1),yarn(2),yarnPer(3);
27+
//run in local
28+
local(0),
29+
//submit job to standalone cluster
30+
standalone(1),
31+
//submit job to flink-session which is already run on yarn
32+
yarn(2),
33+
//submit job to yarn cluster as an application
34+
yarnPer(3);
2835

2936
private int type;
3037

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.launcher;
2020

21+
import com.dtstack.flink.sql.enums.ClusterMode;
2122
import com.dtstack.flink.sql.option.Options;
2223
import com.dtstack.flink.sql.util.PluginUtil;
2324
import org.apache.commons.io.Charsets;
@@ -27,39 +28,29 @@
2728
import org.apache.flink.configuration.ConfigConstants;
2829
import org.apache.flink.configuration.Configuration;
2930
import org.apache.flink.configuration.GlobalConfiguration;
30-
import org.apache.flink.configuration.HighAvailabilityOptions;
3131
import org.apache.flink.configuration.JobManagerOptions;
3232
import org.apache.flink.core.fs.FileSystem;
3333
import org.apache.flink.runtime.akka.AkkaUtils;
34-
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
3534
import org.apache.flink.runtime.minicluster.MiniCluster;
3635
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
3736
import org.apache.flink.runtime.util.LeaderConnectionInfo;
38-
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
3937
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
4038
import org.apache.flink.yarn.YarnClusterDescriptor;
41-
import org.apache.hadoop.fs.Path;
4239
import org.apache.hadoop.yarn.api.records.ApplicationId;
4340
import org.apache.hadoop.yarn.api.records.ApplicationReport;
4441
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
4542
import org.apache.hadoop.yarn.client.api.YarnClient;
46-
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
4743
import org.apache.hadoop.yarn.conf.YarnConfiguration;
44+
import org.apache.hadoop.yarn.util.StringHelper;
4845

49-
import java.io.File;
5046
import java.net.InetSocketAddress;
5147
import java.net.URLDecoder;
52-
import java.util.*;
53-
54-
import com.dtstack.flink.sql.enums.ClusterMode;
55-
import org.apache.hadoop.yarn.exceptions.YarnException;
56-
import org.apache.hadoop.yarn.util.StringHelper;
57-
58-
import java.io.IOException;
59-
import java.util.stream.Collectors;
60-
import java.util.stream.Stream;
61-
62-
import static java.util.Objects.requireNonNull;
48+
import java.util.EnumSet;
49+
import java.util.HashSet;
50+
import java.util.Iterator;
51+
import java.util.List;
52+
import java.util.Properties;
53+
import java.util.Set;
6354

6455
/**
6556
* @author sishu.yss
@@ -71,7 +62,7 @@ public static ClusterClient createClusterClient(Options launcherOptions) throws
7162
if (mode.equals(ClusterMode.standalone.name())) {
7263
return createStandaloneClient(launcherOptions);
7364
} else if (mode.equals(ClusterMode.yarn.name())) {
74-
return createYarnClient(launcherOptions, mode);
65+
return createYarnSessionClient(launcherOptions, mode);
7566
}
7667
throw new IllegalArgumentException("Unsupported cluster client type: ");
7768
}
@@ -91,14 +82,16 @@ public static ClusterClient createStandaloneClient(Options launcherOptions) thro
9182
return clusterClient;
9283
}
9384

94-
public static ClusterClient createYarnClient(Options launcherOptions, String mode) {
85+
public static ClusterClient createYarnSessionClient(Options launcherOptions, String mode) {
86+
9587
String flinkConfDir = launcherOptions.getFlinkconf();
9688
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
9789
String yarnConfDir = launcherOptions.getYarnconf();
90+
9891
if (StringUtils.isNotBlank(yarnConfDir)) {
9992
try {
10093
config.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
101-
FileSystem.initialize(config);
94+
FileSystem.initialize(config, null);
10295

10396
YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
10497
YarnClient yarnClient = YarnClient.createYarnClient();
@@ -116,6 +109,7 @@ public static ClusterClient createYarnClient(Options launcherOptions, String mod
116109
} else {
117110
applicationId = getYarnClusterApplicationId(yarnClient);
118111
}
112+
119113
System.out.println("applicationId=" + applicationId.toString());
120114

121115
if (StringUtils.isEmpty(applicationId.toString())) {
@@ -129,8 +123,9 @@ public static ClusterClient createYarnClient(Options launcherOptions, String mod
129123
} catch (Exception e) {
130124
throw new RuntimeException(e);
131125
}
126+
}else{
127+
throw new RuntimeException("yarn mode must set param of 'yarnconf'!!!");
132128
}
133-
throw new UnsupportedOperationException("Haven't been developed yet!");
134129
}
135130

136131

0 commit comments

Comments
 (0)