3333import org .apache .flink .runtime .jobgraph .JobGraph ;
3434import org .apache .hadoop .yarn .api .records .ApplicationId ;
3535import org .apache .hadoop .yarn .util .ConverterUtils ;
36+ import org .slf4j .Logger ;
37+ import org .slf4j .LoggerFactory ;
3638
3739
3840/**
4143 * @author maqi
4244 */
4345public class YarnSessionClusterExecutor {
46+
47+ private static final Logger LOG = LoggerFactory .getLogger (YarnJobClusterExecutor .class );
48+
4449 JobParamsInfo jobParamsInfo ;
4550
4651 public YarnSessionClusterExecutor (JobParamsInfo jobParamsInfo ) {
@@ -52,28 +57,37 @@ public void exec() throws Exception {
5257 Configuration flinkConfiguration = JobGraphBuildUtil .getFlinkConfiguration (jobParamsInfo .getFlinkConfDir (), jobParamsInfo .getConfProperties ());
5358 ClusterDescriptor clusterDescriptor = YarnClusterClientFactory .INSTANCE .createClusterDescriptor (jobParamsInfo .getYarnConfDir (), flinkConfiguration );
5459
55- Object yid = jobParamsInfo .getYarnSessionConfProperties ().get ("yid" );
56- if (null == yid ) {
57- throw new RuntimeException ("yarnSessionMode yid is required" );
58- }
60+ try {
61+ Object yid = jobParamsInfo .getYarnSessionConfProperties ().get ("yid" );
62+ if (null == yid ) {
63+ throw new RuntimeException ("yarnSessionMode yid is required" );
64+ }
5965
60- ApplicationId applicationId = ConverterUtils .toApplicationId (yid .toString ());
61- ClusterClientProvider <ApplicationId > retrieve = clusterDescriptor .retrieve (applicationId );
62- ClusterClient <ApplicationId > clusterClient = retrieve .getClusterClient ();
66+ ApplicationId applicationId = ConverterUtils .toApplicationId (yid .toString ());
67+ ClusterClientProvider <ApplicationId > retrieve = clusterDescriptor .retrieve (applicationId );
68+ ClusterClient <ApplicationId > clusterClient = retrieve .getClusterClient ();
6369
64- if (StringUtils .equalsIgnoreCase (jobParamsInfo .getPluginLoadMode (), EPluginLoadMode .SHIPFILE .name ())) {
65- jobGraph .getUserArtifacts ().clear ();
66- } else {
67- JobGraphBuildUtil .fillJobGraphClassPath (jobGraph );
68- }
70+ if (StringUtils .equalsIgnoreCase (jobParamsInfo .getPluginLoadMode (), EPluginLoadMode .SHIPFILE .name ())) {
71+ jobGraph .getUserArtifacts ().clear ();
72+ } else {
73+ JobGraphBuildUtil .fillJobGraphClassPath (jobGraph );
74+ }
75+
76+ if (!StringUtils .isEmpty (jobParamsInfo .getUdfJar ())) {
77+ JobGraphBuildUtil .fillUserJarForJobGraph (jobParamsInfo .getUdfJar (), jobGraph );
78+ }
6979
70- if (!StringUtils .isEmpty (jobParamsInfo .getUdfJar ())) {
71- JobGraphBuildUtil .fillUserJarForJobGraph (jobParamsInfo .getUdfJar (), jobGraph );
80+ JobExecutionResult jobExecutionResult = ClientUtils .submitJob (clusterClient , jobGraph );
81+ String jobId = jobExecutionResult .getJobID ().toString ();
82+ LOG .info ("jobID:" + jobId );
83+ } finally {
84+ try {
85+ clusterDescriptor .close ();
86+ } catch (Exception e ) {
87+ LOG .info ("Could not properly close the yarn cluster descriptor." , e );
88+ }
7289 }
7390
74- JobExecutionResult jobExecutionResult = ClientUtils .submitJob (clusterClient , jobGraph );
75- String jobId = jobExecutionResult .getJobID ().toString ();
76- System .out .println ("jobID:" + jobId );
7791
7892 }
7993
0 commit comments