Skip to content

Commit 07fdd63

Browse files
committed
fix Impala sink kerberos error
1 parent 2c787cb commit 07fdd63

2 files changed

Lines changed: 202 additions & 16 deletions

File tree

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package com.dtstack.flink.sql.sink.impala;
2+
3+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
4+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
5+
import org.apache.hadoop.conf.Configuration;
6+
import org.apache.hadoop.security.UserGroupInformation;
7+
8+
import java.io.IOException;
9+
import java.util.List;
10+
11+
import static org.apache.flink.util.Preconditions.checkNotNull;
12+
13+
/**
14+
* @program: flinkStreamSQL
15+
* @author: wuren
16+
* @create: 2020/09/11
17+
**/
18+
public class ImpalaOutputFormat extends JDBCUpsertOutputFormat {
19+
private String keytabPath;
20+
private String krb5confPath;
21+
private String principal;
22+
private Integer authMech;
23+
24+
public ImpalaOutputFormat(
25+
JDBCOptions options,
26+
String[] fieldNames,
27+
String[] keyFields,
28+
String[] partitionFields,
29+
int[] fieldTypes,
30+
int flushMaxSize,
31+
long flushIntervalMills,
32+
boolean allReplace,
33+
String updateMode,
34+
Integer authMech,
35+
String keytabPath,
36+
String krb5confPath,
37+
String principal) {
38+
super(options,
39+
fieldNames,
40+
keyFields,
41+
partitionFields,
42+
fieldTypes,
43+
flushMaxSize,
44+
flushIntervalMills,
45+
allReplace,
46+
updateMode);
47+
this.authMech = authMech;
48+
this.keytabPath = keytabPath;
49+
this.krb5confPath = krb5confPath;
50+
this.principal = principal;
51+
}
52+
53+
@Override
54+
public void open(int taskNumber, int numTasks) throws IOException {
55+
super.open(taskNumber, numTasks);
56+
if (authMech == 1) {
57+
System.setProperty("java.security.krb5.conf", krb5confPath);
58+
Configuration configuration = new Configuration();
59+
configuration.set("hadoop.security.authentication", "Kerberos");
60+
UserGroupInformation.setConfiguration(configuration);
61+
try {
62+
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
63+
} catch (IOException e) {
64+
throw new RuntimeException("loginUserFromKeytab error ..", e);
65+
}
66+
}
67+
}
68+
69+
public static Builder impalaBuilder() {
70+
return new Builder();
71+
}
72+
73+
public static class Builder {
74+
private Integer authMech;
75+
private String keytabPath;
76+
private String krb5confPath;
77+
private String principal;
78+
79+
protected JDBCOptions options;
80+
protected String[] fieldNames;
81+
protected String[] keyFields;
82+
protected String[] partitionFields;
83+
protected int[] fieldTypes;
84+
protected int flushMaxSize = DEFAULT_FLUSH_MAX_SIZE;
85+
protected long flushIntervalMills = DEFAULT_FLUSH_INTERVAL_MILLS;
86+
protected boolean allReplace = DEFAULT_ALLREPLACE_VALUE;
87+
protected String updateMode;
88+
89+
/**
90+
* required, jdbc options.
91+
*/
92+
public Builder setOptions(JDBCOptions options) {
93+
this.options = options;
94+
return this;
95+
}
96+
97+
/**
98+
* required, field names of this jdbc sink.
99+
*/
100+
public Builder setFieldNames(String[] fieldNames) {
101+
this.fieldNames = fieldNames;
102+
return this;
103+
}
104+
105+
/**
106+
* required, upsert unique keys.
107+
*/
108+
public Builder setKeyFields(List<String> keyFields) {
109+
this.keyFields = keyFields == null ? null : keyFields.toArray(new String[keyFields.size()]);
110+
return this;
111+
}
112+
113+
/**
114+
* required, field types of this jdbc sink.
115+
*/
116+
public Builder setFieldTypes(int[] fieldTypes) {
117+
this.fieldTypes = fieldTypes;
118+
return this;
119+
}
120+
121+
/**
122+
* optional, partition Fields
123+
*
124+
* @param partitionFields
125+
* @return
126+
*/
127+
public Builder setPartitionFields(String[] partitionFields) {
128+
this.partitionFields = partitionFields;
129+
return this;
130+
}
131+
132+
/**
133+
* optional, flush max size (includes all append, upsert and delete records),
134+
* over this number of records, will flush data.
135+
*/
136+
public Builder setFlushMaxSize(int flushMaxSize) {
137+
this.flushMaxSize = flushMaxSize;
138+
return this;
139+
}
140+
141+
/**
142+
* optional, flush interval mills, over this time, asynchronous threads will flush data.
143+
*/
144+
public Builder setFlushIntervalMills(long flushIntervalMills) {
145+
this.flushIntervalMills = flushIntervalMills;
146+
return this;
147+
}
148+
149+
public Builder setAllReplace(boolean allReplace) {
150+
this.allReplace = allReplace;
151+
return this;
152+
}
153+
154+
public Builder setUpdateMode(String updateMode) {
155+
this.updateMode = updateMode;
156+
return this;
157+
}
158+
159+
public Builder setAuthMech(Integer authMech) {
160+
this.authMech = authMech;
161+
return this;
162+
}
163+
public Builder setKeytabPath(String keytabPath) {
164+
this.keytabPath = keytabPath;
165+
return this;
166+
}
167+
public Builder setKrb5confPath(String krb5confPath) {
168+
this.krb5confPath = krb5confPath;
169+
return this;
170+
}
171+
public Builder setPrincipal(String principal) {
172+
this.principal = principal;
173+
return this;
174+
}
175+
176+
public ImpalaOutputFormat build() {
177+
checkNotNull(options, "No options supplied.");
178+
checkNotNull(fieldNames, "No fieldNames supplied.");
179+
return new ImpalaOutputFormat(
180+
options,
181+
fieldNames,
182+
keyFields,
183+
partitionFields,
184+
fieldTypes,
185+
flushMaxSize,
186+
flushIntervalMills,
187+
allReplace,
188+
updateMode,
189+
authMech,
190+
keytabPath,
191+
krb5confPath,
192+
principal);
193+
}
194+
}
195+
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public ImpalaSink() {
4545
}
4646

4747
@Override
48-
public JDBCUpsertOutputFormat getOutputFormat() {
48+
public ImpalaOutputFormat getOutputFormat() {
4949
JDBCOptions jdbcOptions = JDBCOptions.builder()
5050
.setDbUrl(getImpalaJdbcUrl())
5151
.setDialect(new ImpalaDialect(getFieldTypes(), primaryKeys))
@@ -54,7 +54,7 @@ public JDBCUpsertOutputFormat getOutputFormat() {
5454
.setTableName(tableName)
5555
.build();
5656

57-
return JDBCUpsertOutputFormat.builder()
57+
return ImpalaOutputFormat.impalaBuilder()
5858
.setOptions(jdbcOptions)
5959
.setFieldNames(fieldNames)
6060
.setFlushMaxSize(batchNum)
@@ -64,20 +64,21 @@ public JDBCUpsertOutputFormat getOutputFormat() {
6464
.setPartitionFields(impalaTableInfo.getPartitionFields())
6565
.setAllReplace(allReplace)
6666
.setUpdateMode(updateMode)
67+
.setAuthMech(impalaTableInfo.getAuthMech())
68+
.setKeytabPath(impalaTableInfo.getKeyTabFilePath())
69+
.setKrb5confPath(impalaTableInfo.getKrb5FilePath())
70+
.setPrincipal(impalaTableInfo.getPrincipal())
6771
.build();
6872
}
6973

70-
7174
public String getImpalaJdbcUrl() {
7275
Integer authMech = impalaTableInfo.getAuthMech();
7376
String newUrl = dbUrl;
7477
StringBuffer urlBuffer = new StringBuffer(dbUrl);
7578
if (authMech == EAuthMech.NoAuthentication.getType()) {
7679
return newUrl;
7780
} else if (authMech == EAuthMech.Kerberos.getType()) {
78-
String keyTabFilePath = impalaTableInfo.getKeyTabFilePath();
79-
String krb5FilePath = impalaTableInfo.getKrb5FilePath();
80-
String principal = impalaTableInfo.getPrincipal();
81+
8182
String krbRealm = impalaTableInfo.getKrbRealm();
8283
String krbHostFqdn = impalaTableInfo.getKrbHostFQDN();
8384
String krbServiceName = impalaTableInfo.getKrbServiceName();
@@ -89,16 +90,6 @@ public String getImpalaJdbcUrl() {
8990
);
9091
newUrl = urlBuffer.toString();
9192

92-
System.setProperty("java.security.krb5.conf", krb5FilePath);
93-
Configuration configuration = new Configuration();
94-
configuration.set("hadoop.security.authentication", "Kerberos");
95-
UserGroupInformation.setConfiguration(configuration);
96-
try {
97-
UserGroupInformation.loginUserFromKeytab(principal, keyTabFilePath);
98-
} catch (IOException e) {
99-
throw new RuntimeException("loginUserFromKeytab error ..", e);
100-
}
101-
10293
} else if (authMech == EAuthMech.UserName.getType()) {
10394
urlBuffer.append(";"
10495
.concat("AuthMech=3;")

0 commit comments

Comments
 (0)