Skip to content

Commit 37fb07a

Browse files
committed
tm memory
1 parent 6f2048b commit 37fb07a

3 files changed

Lines changed: 50 additions & 61 deletions

File tree

launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/YarnJobClusterExecutor.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.client.deployment.ClusterSpecification;
2828
import org.apache.flink.client.program.ClusterClientProvider;
2929
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.configuration.MemorySize;
31+
import org.apache.flink.configuration.TaskManagerOptions;
3032
import org.apache.flink.runtime.jobgraph.JobGraph;
3133
import org.apache.flink.yarn.YarnClusterDescriptor;
3234
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -53,9 +55,9 @@
5355
public class YarnJobClusterExecutor {
5456
private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class);
5557

56-
public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
57-
public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
58-
58+
private static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
59+
private static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
60+
private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
5961

6062
YarnClusterClientFactory yarnClusterClientFactory;
6163
JobParamsInfo jobParamsInfo;
@@ -78,7 +80,7 @@ public void exec() throws Exception {
7880
List<File> shipFiles = getShipFiles(jobParamsInfo.getFlinkJarPath(), jobParamsInfo.getPluginLoadMode(), jobGraph, clusterDescriptor);
7981
clusterDescriptor.addShipFiles(shipFiles);
8082

81-
ClusterSpecification clusterSpecification = yarnClusterClientFactory.getClusterSpecification(jobParamsInfo.getConfProperties());
83+
ClusterSpecification clusterSpecification = yarnClusterClientFactory.getClusterSpecification(flinkConfiguration);
8284
ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
8385

8486
String applicationId = applicationIdClusterClientProvider.getClusterClient().getClusterId().toString();
@@ -100,8 +102,10 @@ private void appendApplicationConfig(Configuration flinkConfig, JobParamsInfo jo
100102
discoverLogConfigFile(jobParamsInfo.getFlinkConfDir()).ifPresent(file ->
101103
flinkConfig.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, file.getPath()));
102104
}
103-
//TODO 参数设置
104-
flinkConfig.setString("taskmanager.memory.flink.size","1024m");
105+
106+
if (!flinkConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
107+
flinkConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), DEFAULT_TOTAL_PROCESS_MEMORY);
108+
}
105109
}
106110

107111
protected List<File> getShipFiles(String flinkJarPath, String pluginLoadMode, JobGraph jobGraph, YarnClusterDescriptor clusterDescriptor)

launcher/src/main/java/com/dtstack/flink/sql/launcher/factory/AbstractClusterClientFactory.java

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
package com.dtstack.flink.sql.launcher.factory;
2020

21-
import com.dtstack.flink.sql.util.MathUtil;
2221
import org.apache.flink.client.deployment.ClusterDescriptor;
2322
import org.apache.flink.client.deployment.ClusterSpecification;
2423
import org.apache.flink.configuration.Configuration;
25-
import java.util.Properties;
24+
import org.apache.flink.configuration.ConfigurationUtils;
25+
import org.apache.flink.configuration.TaskManagerOptions;
26+
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
27+
28+
import static org.apache.flink.util.Preconditions.checkNotNull;
2629

2730

2831
/**
@@ -32,42 +35,24 @@
3235
*/
3336
public abstract class AbstractClusterClientFactory {
3437

38+
public ClusterSpecification getClusterSpecification(Configuration configuration) {
39+
checkNotNull(configuration);
3540

36-
public final static int MIN_JM_MEMORY = 768;
37-
public final static int MIN_TM_MEMORY = 1024;
38-
39-
public final static String JOBMANAGER_MEMORY_MB = "jobmanager.memory.mb";
40-
public final static String TASKMANAGER_MEMORY_MB = "taskmanager.memory.mb";
41-
public final static String SLOTS_PER_TASKMANAGER = "taskmanager.slots";
42-
43-
public ClusterSpecification getClusterSpecification(Properties confProperties) {
44-
int jobmanagerMemoryMb = 768;
45-
int taskmanagerMemoryMb = 1024;
46-
int slotsPerTaskManager = 1;
47-
48-
if (confProperties != null) {
49-
if (confProperties.containsKey(JOBMANAGER_MEMORY_MB)) {
50-
jobmanagerMemoryMb = MathUtil.getIntegerVal(confProperties.get(JOBMANAGER_MEMORY_MB));
51-
if (jobmanagerMemoryMb < MIN_JM_MEMORY) {
52-
jobmanagerMemoryMb = MIN_JM_MEMORY;
53-
}
54-
}
55-
56-
if (confProperties.containsKey(TASKMANAGER_MEMORY_MB)) {
57-
taskmanagerMemoryMb = MathUtil.getIntegerVal(confProperties.get(TASKMANAGER_MEMORY_MB));
58-
if (taskmanagerMemoryMb < MIN_TM_MEMORY) {
59-
taskmanagerMemoryMb = MIN_TM_MEMORY;
60-
}
61-
}
41+
final int jobManagerMemoryMB = ConfigurationUtils
42+
.getJobManagerHeapMemory(configuration)
43+
.getMebiBytes();
44+
// taskmanager.memory.process.size
45+
final int taskManagerMemoryMB = TaskExecutorProcessUtils
46+
.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
47+
configuration, TaskManagerOptions.TOTAL_PROCESS_MEMORY))
48+
.getTotalProcessMemorySize()
49+
.getMebiBytes();
6250

63-
if (confProperties.containsKey(SLOTS_PER_TASKMANAGER)) {
64-
slotsPerTaskManager = MathUtil.getIntegerVal(confProperties.get(SLOTS_PER_TASKMANAGER));
65-
}
66-
}
51+
int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
6752

6853
return new ClusterSpecification.ClusterSpecificationBuilder()
69-
.setMasterMemoryMB(jobmanagerMemoryMb)
70-
.setTaskManagerMemoryMB(taskmanagerMemoryMb)
54+
.setMasterMemoryMB(jobManagerMemoryMB)
55+
.setTaskManagerMemoryMB(taskManagerMemoryMB)
7156
.setSlotsPerTaskManager(slotsPerTaskManager)
7257
.createClusterSpecification();
7358
}

pom.xml

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,28 @@
1010
<url>http://maven.apache.org</url>
1111
<modules>
1212
<module>core</module>
13-
<module>kafka09</module>
14-
<module>kafka10</module>
15-
<module>kafka11</module>
16-
<module>kafka</module>
17-
<module>mysql</module>
18-
<module>hbase</module>
19-
<module>elasticsearch5</module>
20-
<module>mongo</module>
21-
<module>redis5</module>
13+
<!--<module>kafka09</module>-->
14+
<!--<module>kafka10</module>-->
15+
<!--<module>kafka11</module>-->
16+
<!--<module>kafka</module>-->
17+
<!--<module>mysql</module>-->
18+
<!--<module>hbase</module>-->
19+
<!--<module>elasticsearch5</module>-->
20+
<!--<module>mongo</module>-->
21+
<!--<module>redis5</module>-->
2222
<module>launcher</module>
23-
<module>rdb</module>
24-
<module>sqlserver</module>
25-
<module>oracle</module>
26-
<module>cassandra</module>
27-
<module>kudu</module>
28-
<module>postgresql</module>
29-
<module>serversocket</module>
30-
<module>console</module>
31-
<module>clickhouse</module>
32-
<module>impala</module>
33-
<module>db2</module>
34-
<module>polardb</module>
23+
<!--<module>rdb</module>-->
24+
<!--<module>sqlserver</module>-->
25+
<!--<module>oracle</module>-->
26+
<!--<module>cassandra</module>-->
27+
<!--<module>kudu</module>-->
28+
<!--<module>postgresql</module>-->
29+
<!--<module>serversocket</module>-->
30+
<!--<module>console</module>-->
31+
<!--<module>clickhouse</module>-->
32+
<!--<module>impala</module>-->
33+
<!--<module>db2</module>-->
34+
<!--<module>polardb</module>-->
3535

3636
</modules>
3737

0 commit comments

Comments
 (0)