Skip to content

Commit a797fee

Browse files
committed
complete impala kerberos
1 parent 07fdd63 commit a797fee

8 files changed

Lines changed: 260 additions & 94 deletions

File tree

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.security.UserGroupInformation;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* @program: flinkStreamSQL
28+
* @author: wuren
29+
* @create: 2020/09/14
30+
**/
31+
public class Krb5Utils {
32+
33+
public static final String KRB5_CONF_KEY = "java.security.krb5.conf";
34+
public static final String HADOOP_AUTH_KEY = "hadoop.security.authentication";
35+
public static final String KRB_STR = "Kerberos";
36+
37+
public static UserGroupInformation getUgi(String principal, String keytabPath, String krb5confPath) throws IOException {
38+
System.setProperty(KRB5_CONF_KEY, krb5confPath);
39+
Configuration configuration = new Configuration();
40+
configuration.set(HADOOP_AUTH_KEY , KRB_STR);
41+
UserGroupInformation.setConfiguration(configuration);
42+
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
43+
}
44+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.util;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* @program: flinkStreamSQL
28+
* @author: wuren
29+
* @create: 2020/09/14
30+
**/
31+
public class Krb5UtilsTest {
32+
@Test
33+
public void testGetUgi() throws IOException {
34+
String principal = "";
35+
String keytabPath = "";
36+
String krb5confPath = "";
37+
try {
38+
Krb5Utils.getUgi(principal, keytabPath, krb5confPath);
39+
} catch (IllegalArgumentException e) {
40+
Assert.assertEquals(e.getMessage(), "Can't get Kerberos realm");
41+
}
42+
43+
}
44+
}

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllReqRow.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@
2424
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2525
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
2626
import com.dtstack.flink.sql.util.JDBCUtils;
27+
import com.dtstack.flink.sql.util.Krb5Utils;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.hadoop.conf.Configuration;
2930
import org.apache.hadoop.security.UserGroupInformation;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

3334
import java.io.IOException;
35+
import java.security.PrivilegedAction;
3436
import java.sql.Connection;
3537
import java.sql.DriverManager;
38+
import java.sql.SQLException;
3639
import java.util.List;
3740

3841
/**
@@ -61,10 +64,29 @@ public ImpalaAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInf
6164
@Override
6265
public Connection getConn(String dbUrl, String userName, String password) {
6366
try {
64-
Connection connection ;
67+
Connection connection;
6568
String url = getUrl();
6669
JDBCUtils.forName(IMPALA_DRIVER, getClass().getClassLoader());
67-
connection = DriverManager.getConnection(url);
70+
// Kerberos
71+
if (impalaSideTableInfo.getAuthMech() == 1) {
72+
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
73+
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
74+
String principal = impalaSideTableInfo.getPrincipal();
75+
UserGroupInformation ugi = Krb5Utils.getUgi(principal, keyTabFilePath, krb5FilePath);
76+
connection = ugi.doAs(new PrivilegedAction<Connection>() {
77+
@Override
78+
public Connection run() {
79+
try {
80+
return DriverManager.getConnection(url);
81+
} catch (SQLException e) {
82+
e.printStackTrace();
83+
}
84+
return null;
85+
}
86+
});
87+
} else {
88+
connection = DriverManager.getConnection(url);
89+
}
6890
connection.setAutoCommit(false);
6991
return connection;
7092
} catch (Exception e) {
@@ -83,9 +105,6 @@ public String getUrl() throws IOException {
83105
newUrl = urlBuffer.toString();
84106

85107
} else if (authMech == 1) {
86-
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
87-
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
88-
String principal = impalaSideTableInfo.getPrincipal();
89108
String krbRealm = impalaSideTableInfo.getKrbRealm();
90109
String krbHostFQDN = impalaSideTableInfo.getKrbHostFQDN();
91110
String krbServiceName = impalaSideTableInfo.getKrbServiceName();
@@ -96,11 +115,7 @@ public String getUrl() throws IOException {
96115
.concat("KrbServiceName=").concat(krbServiceName).concat(";")
97116
);
98117
newUrl = urlBuffer.toString();
99-
System.setProperty("java.security.krb5.conf", krb5FilePath);
100-
Configuration configuration = new Configuration();
101-
configuration.set("hadoop.security.authentication" , "Kerberos");
102-
UserGroupInformation.setConfiguration(configuration);
103-
UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
118+
104119

105120
} else if (authMech == 2) {
106121
String uName = impalaSideTableInfo.getUserName();

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,33 @@
1818

1919
package com.dtstack.flink.sql.side.impala;
2020

21-
import com.dtstack.flink.sql.factory.DTThreadFactory;
2221
import com.dtstack.flink.sql.side.FieldInfo;
2322
import com.dtstack.flink.sql.side.JoinInfo;
2423
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2524
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
2625
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncReqRow;
26+
import com.dtstack.flink.sql.util.Krb5Utils;
2727
import io.vertx.core.Vertx;
2828
import io.vertx.core.VertxOptions;
2929
import io.vertx.core.json.JsonObject;
3030
import io.vertx.ext.jdbc.JDBCClient;
31+
import io.vertx.ext.sql.SQLClient;
3132
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3233
import org.apache.flink.configuration.Configuration;
34+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
35+
import org.apache.flink.table.dataformat.BaseRow;
36+
import org.apache.flink.types.Row;
3337
import org.apache.hadoop.security.UserGroupInformation;
3438
import org.slf4j.Logger;
3539
import org.slf4j.LoggerFactory;
3640

37-
import java.io.IOException;
41+
import java.security.PrivilegedAction;
3842
import java.util.List;
39-
import java.util.concurrent.LinkedBlockingQueue;
40-
import java.util.concurrent.ThreadPoolExecutor;
41-
import java.util.concurrent.TimeUnit;
43+
import java.util.Map;
44+
import java.util.concurrent.CountDownLatch;
45+
46+
import java.util.concurrent.atomic.AtomicBoolean;
47+
import java.util.concurrent.atomic.AtomicLong;
4248

4349
/**
4450
* Date: 2019/11/12
@@ -53,27 +59,40 @@ public class ImpalaAsyncReqRow extends RdbAsyncReqRow {
5359

5460
private final static String IMPALA_DRIVER = "com.cloudera.impala.jdbc41.Driver";
5561

62+
protected UserGroupInformation ugi = null;
5663

5764
public ImpalaAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
5865
super(new ImpalaAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
5966
}
6067

6168
@Override
6269
public void open(Configuration parameters) throws Exception {
63-
super.open(parameters);
6470
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
71+
if (impalaSideTableInfo.getAuthMech() == 1) {
72+
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
73+
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
74+
String principal = impalaSideTableInfo.getPrincipal();
75+
ugi = Krb5Utils.getUgi(principal, keyTabFilePath, krb5FilePath);
76+
openJdbc(parameters);
77+
} else {
78+
openJdbc(parameters);
79+
}
80+
}
6581

82+
public void openJdbc(Configuration parameters) throws Exception {
83+
super.open(parameters);
84+
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
6685
JsonObject impalaClientConfig = new JsonObject();
6786
impalaClientConfig.put("url", getUrl())
68-
.put("driver_class", IMPALA_DRIVER)
69-
.put("max_pool_size", impalaSideTableInfo.getAsyncPoolSize())
70-
.put("provider_class", DT_PROVIDER_CLASS)
71-
.put("idle_connection_test_period", 300)
72-
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN)
73-
.put("max_idle_time", 600)
74-
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
75-
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
76-
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
87+
.put("driver_class", IMPALA_DRIVER)
88+
.put("max_pool_size", impalaSideTableInfo.getAsyncPoolSize())
89+
.put("provider_class", DT_PROVIDER_CLASS)
90+
.put("idle_connection_test_period", 300)
91+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN)
92+
.put("max_idle_time", 600)
93+
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
94+
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
95+
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
7796

7897
System.setProperty("vertx.disableFileCPResolving", "true");
7998

@@ -85,7 +104,6 @@ public void open(Configuration parameters) throws Exception {
85104
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
86105
}
87106

88-
89107
public String getUrl() {
90108
ImpalaSideTableInfo impalaSideTableInfo = (ImpalaSideTableInfo) sideInfo.getSideTableInfo();
91109

@@ -95,11 +113,7 @@ public String getUrl() {
95113
StringBuffer urlBuffer = new StringBuffer(impalaSideTableInfo.getUrl());
96114
if (authMech == 0) {
97115
newUrl = urlBuffer.toString();
98-
99116
} else if (authMech == 1) {
100-
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
101-
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
102-
String principal = impalaSideTableInfo.getPrincipal();
103117
String krbRealm = impalaSideTableInfo.getKrbRealm();
104118
String krbHostFQDN = impalaSideTableInfo.getKrbHostFQDN();
105119
String krbServiceName = impalaSideTableInfo.getKrbServiceName();
@@ -110,16 +124,6 @@ public String getUrl() {
110124
.concat("KrbServiceName=").concat(krbServiceName).concat(";")
111125
);
112126
newUrl = urlBuffer.toString();
113-
System.setProperty("java.security.krb5.conf", krb5FilePath);
114-
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
115-
configuration.set("hadoop.security.authentication" , "Kerberos");
116-
UserGroupInformation.setConfiguration(configuration);
117-
try {
118-
UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
119-
} catch (IOException e) {
120-
throw new RuntimeException("kerberos login fail! e: " + e);
121-
}
122-
123127
} else if (authMech == 2) {
124128
String uName = impalaSideTableInfo.getUserName();
125129
urlBuffer.append(";"
@@ -129,7 +133,6 @@ public String getUrl() {
129133
.concat("UseSasl=0")
130134
);
131135
newUrl = urlBuffer.toString();
132-
133136
} else if (authMech == 3) {
134137
String uName = impalaSideTableInfo.getUserName();
135138
String pwd = impalaSideTableInfo.getPassword();
@@ -139,11 +142,41 @@ public String getUrl() {
139142
.concat("PWD=").concat(pwd)
140143
);
141144
newUrl = urlBuffer.toString();
142-
143145
} else {
144146
throw new IllegalArgumentException("The value of authMech is illegal, Please select 0, 1, 2, 3");
145147
}
146-
147148
return newUrl;
148149
}
150+
151+
@Override
152+
protected void asyncQueryData(Map<String, Object> inputParams,
153+
Row input,
154+
ResultFuture<BaseRow> resultFuture,
155+
SQLClient rdbSqlClient,
156+
AtomicLong failCounter,
157+
AtomicBoolean finishFlag,
158+
CountDownLatch latch) {
159+
if (ugi == null) {
160+
doAsyncQueryData(inputParams,
161+
input, resultFuture,
162+
rdbSqlClient,
163+
failCounter,
164+
finishFlag,
165+
latch);
166+
} else {
167+
// Kerberos
168+
ugi.doAs(new PrivilegedAction<Object>() {
169+
@Override
170+
public Object run() {
171+
doAsyncQueryData(inputParams,
172+
input, resultFuture,
173+
rdbSqlClient,
174+
failCounter,
175+
finishFlag,
176+
latch);
177+
return null;
178+
}
179+
});
180+
}
181+
}
149182
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
44
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
5-
import org.apache.hadoop.conf.Configuration;
5+
import com.dtstack.flink.sql.util.Krb5Utils;
66
import org.apache.hadoop.security.UserGroupInformation;
77

88
import java.io.IOException;
9+
import java.security.PrivilegedAction;
910
import java.util.List;
1011

1112
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -52,17 +53,17 @@ public ImpalaOutputFormat(
5253

5354
@Override
5455
public void open(int taskNumber, int numTasks) throws IOException {
55-
super.open(taskNumber, numTasks);
5656
if (authMech == 1) {
57-
System.setProperty("java.security.krb5.conf", krb5confPath);
58-
Configuration configuration = new Configuration();
59-
configuration.set("hadoop.security.authentication", "Kerberos");
60-
UserGroupInformation.setConfiguration(configuration);
61-
try {
62-
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
63-
} catch (IOException e) {
64-
throw new RuntimeException("loginUserFromKeytab error ..", e);
65-
}
57+
UserGroupInformation ugi = Krb5Utils.getUgi(principal, keytabPath, krb5confPath);
58+
ugi.doAs(new PrivilegedAction<Object>() {
59+
@Override
60+
public Object run() {
61+
openJdbc();
62+
return null;
63+
}
64+
});
65+
} else {
66+
super.open(taskNumber, numTasks);
6667
}
6768
}
6869

0 commit comments

Comments
 (0)