|
29 | 29 | import org.apache.flink.yarn.AbstractYarnClusterDescriptor; |
30 | 30 | import org.apache.flink.yarn.YarnClusterDescriptor; |
31 | 31 | import org.apache.hadoop.fs.Path; |
| 32 | +import org.apache.hadoop.security.UserGroupInformation; |
32 | 33 | import org.apache.hadoop.yarn.client.api.YarnClient; |
33 | 34 | import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| 35 | +import org.slf4j.Logger; |
| 36 | +import org.slf4j.LoggerFactory; |
34 | 37 |
|
35 | 38 | import java.io.File; |
| 39 | +import java.io.IOException; |
36 | 40 | import java.net.MalformedURLException; |
37 | 41 | import java.net.URL; |
38 | 42 | import java.util.ArrayList; |
|
48 | 52 | */ |
49 | 53 |
|
50 | 54 | public class PerJobClusterClientBuilder { |
| 55 | + |
| 56 | + private static final Logger LOG = LoggerFactory.getLogger(PerJobClusterClientBuilder.class); |
| 57 | + |
| 58 | + private static String KEYTAB = "security.kerberos.login.keytab"; |
| 59 | + |
| 60 | + private static String PRINCIPAL = "security.kerberos.login.principal"; |
| 61 | + |
51 | 62 | private YarnClient yarnClient; |
52 | 63 |
|
53 | 64 | private YarnConfiguration yarnConf; |
54 | 65 |
|
55 | | - public void init(String yarnConfDir){ |
| 66 | + public void init(String yarnConfDir, Properties conf) throws IOException { |
| 67 | + |
56 | 68 | if(Strings.isNullOrEmpty(yarnConfDir)) { |
57 | 69 | throw new RuntimeException("parameters of yarn is required"); |
58 | 70 | } |
59 | 71 |
|
60 | 72 | yarnConf = YarnConfLoader.getYarnConf(yarnConfDir); |
| 73 | + |
| 74 | + if (isKerberos(conf)){ |
| 75 | + String keytab = (String) conf.get(KEYTAB); |
| 76 | + String principal = (String) conf.get(PRINCIPAL); |
| 77 | + login(yarnConf, keytab, principal); |
| 78 | + } |
| 79 | + |
61 | 80 | yarnClient = YarnClient.createYarnClient(); |
62 | 81 | yarnClient.init(yarnConf); |
63 | 82 | yarnClient.start(); |
@@ -141,4 +160,23 @@ private AbstractYarnClusterDescriptor getClusterDescriptor( |
141 | 160 | yarnClient, |
142 | 161 | false); |
143 | 162 | } |
| 163 | + |
| 164 | + private boolean isKerberos(Properties conf){ |
| 165 | + String keytab = (String) conf.get(KEYTAB); |
| 166 | + if (StringUtils.isNotBlank(keytab)){ |
| 167 | + return true; |
| 168 | + } else { |
| 169 | + return false; |
| 170 | + } |
| 171 | + } |
| 172 | + |
| 173 | + private void login(org.apache.hadoop.conf.Configuration conf, String keytab, String principal) throws IOException { |
| 174 | + if (StringUtils.isEmpty(principal)){ |
| 175 | + throw new RuntimeException(PRINCIPAL + " must not be null!"); |
| 176 | + } |
| 177 | + UserGroupInformation.setConfiguration(conf); |
| 178 | + UserGroupInformation.loginUserFromKeytab(principal, keytab); |
| 179 | + LOG.info("login successfully! keytab: " + keytab + "principal: " + principal); |
| 180 | + LOG.info("UGI: " + UserGroupInformation.getCurrentUser()); |
| 181 | + } |
144 | 182 | } |
0 commit comments