|
28 | 28 | import org.apache.flink.configuration.Configuration; |
29 | 29 | import org.apache.flink.types.Row; |
30 | 30 | import org.apache.flink.util.Preconditions; |
31 | | -import org.apache.hadoop.hbase.*; |
| 31 | +import org.apache.hadoop.hbase.AuthUtil; |
| 32 | +import org.apache.hadoop.hbase.ChoreService; |
| 33 | +import org.apache.hadoop.hbase.HBaseConfiguration; |
| 34 | +import org.apache.hadoop.hbase.ScheduledChore; |
| 35 | +import org.apache.hadoop.hbase.TableName; |
32 | 36 | import org.apache.hadoop.hbase.client.Connection; |
33 | 37 | import org.apache.hadoop.hbase.client.ConnectionFactory; |
34 | 38 | import org.apache.hadoop.hbase.client.Delete; |
|
42 | 46 | import java.io.File; |
43 | 47 | import java.io.IOException; |
44 | 48 | import java.security.PrivilegedAction; |
| 49 | +import java.util.LinkedHashMap; |
| 50 | +import java.util.LinkedList; |
45 | 51 | import java.util.List; |
46 | 52 | import java.util.Map; |
47 | | -import java.util.Set; |
48 | 53 |
|
49 | 54 | /** |
50 | 55 | * @author: jingzhen@dtstack.com |
@@ -129,21 +134,18 @@ private void openKerberosConn() throws Exception { |
129 | 134 |
|
130 | 135 | UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, clientPrincipal, clientKeytabFile); |
131 | 136 | org.apache.hadoop.conf.Configuration finalConf = conf; |
132 | | - conn = userGroupInformation.doAs(new PrivilegedAction<Connection>() { |
133 | | - @Override |
134 | | - public Connection run() { |
135 | | - try { |
136 | | - ScheduledChore authChore = AuthUtil.getAuthChore(finalConf); |
137 | | - if (authChore != null) { |
138 | | - choreService = new ChoreService("hbaseKerberosSink"); |
139 | | - choreService.scheduleChore(authChore); |
140 | | - } |
141 | | - |
142 | | - return ConnectionFactory.createConnection(finalConf); |
143 | | - } catch (IOException e) { |
144 | | - LOG.error("Get connection fail with config:{}", finalConf); |
145 | | - throw new RuntimeException(e); |
| 137 | + conn = userGroupInformation.doAs((PrivilegedAction<Connection>) () -> { |
| 138 | + try { |
| 139 | + ScheduledChore authChore = AuthUtil.getAuthChore(finalConf); |
| 140 | + if (authChore != null) { |
| 141 | + choreService = new ChoreService("hbaseKerberosSink"); |
| 142 | + choreService.scheduleChore(authChore); |
146 | 143 | } |
| 144 | + |
| 145 | + return ConnectionFactory.createConnection(finalConf); |
| 146 | + } catch (IOException e) { |
| 147 | + LOG.error("Get connection fail with config:{}", finalConf); |
| 148 | + throw new RuntimeException(e); |
147 | 149 | } |
148 | 150 | }); |
149 | 151 | } |
@@ -355,8 +357,8 @@ public HbaseOutputFormat finish() { |
355 | 357 | String[] qualifiers = new String[format.columnNames.length]; |
356 | 358 |
|
357 | 359 | if (format.columnNameFamily != null) { |
358 | | - Set<String> keySet = format.columnNameFamily.keySet(); |
359 | | - String[] columns = keySet.toArray(new String[keySet.size()]); |
| 360 | + List<String> keyList = new LinkedList<>(format.columnNameFamily.keySet()); |
| 361 | + String[] columns = keyList.toArray(new String[0]); |
360 | 362 | for (int i = 0; i < columns.length; ++i) { |
361 | 363 | String col = columns[i]; |
362 | 364 | String[] part = col.split(":"); |
|
0 commit comments