Skip to content

Commit 895df3c

Browse files
committed
[hotfix-36143][hbase]修复hbase 写入数据全为null时,table put 异常
1 parent 40c2c0f commit 895df3c

1 file changed

Lines changed: 25 additions & 16 deletions

File tree

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.LinkedList;
4949
import java.util.List;
5050
import java.util.Map;
51+
import java.util.Objects;
5152
import java.util.concurrent.ScheduledExecutorService;
5253
import java.util.concurrent.ScheduledFuture;
5354
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -199,7 +200,7 @@ private void openKerberosConn() throws Exception {
199200
@Override
200201
public void writeRecord(Tuple2<Boolean, Row> record) {
201202
if (record.f0) {
202-
if (this.batchSize != 0) {
203+
if (this.batchSize > 1) {
203204
writeBatchRecord(record.f1);
204205
} else {
205206
dealInsert(record.f1);
@@ -217,30 +218,34 @@ public void writeBatchRecord(Row row) {
217218

218219
protected synchronized void dealBatchOperation(List<Row> records) {
219220
// A null in the result array means that the call for that action failed, even after retries.
220-
Object[] results = new Object[records.size()];
221+
Object[] results = null;
221222
try {
222223
List<Put> puts = new ArrayList<>();
223224
for (Row record : records) {
224-
puts.add(getPutByRow(record));
225+
Put put = getPutByRow(record);
226+
if (put == null || put.isEmpty()) {
227+
dealError(
228+
record,
229+
"HBase put is empty, please check the record.");
230+
} else {
231+
puts.add(put);
232+
}
225233
}
234+
results = new Object[puts.size()];
226235
table.batch(puts, results);
227236

228237
// 打印结果
229238
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
230239
// 只打印最后一条数据
231240
LOG.info(records.get(records.size() - 1).toString());
232241
}
233-
} catch (IOException | InterruptedException ignored) {
242+
} catch (IOException | InterruptedException e) {
243+
// ignore exception
234244
} finally {
235245
// 判断数据是否插入成功
236-
for (int i = 0; i < results.length; i++) {
246+
for (int i = 0; i < Objects.requireNonNull(results).length; i++) {
237247
if (results[i] instanceof Exception) {
238-
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0) {
239-
LOG.error("Get dirty data: {}", records.get(i).toString());
240-
LOG.error("Error cause: " + ExceptionTrace.traceOriginalCause((Exception) results[i]));
241-
}
242-
// 脏数据记录
243-
outDirtyRecords.inc();
248+
dealError(records.get(i), ExceptionTrace.traceOriginalCause((Exception) results[i]));
244249
} else {
245250
// 输出结果条数记录
246251
outRecords.inc();
@@ -262,11 +267,7 @@ protected void dealInsert(Row record) {
262267
try {
263268
table.put(put);
264269
} catch (Exception e) {
265-
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
266-
LOG.error("Get dirty data: {}", record.toString());
267-
LOG.error("Error cause: " + ExceptionTrace.traceOriginalCause(e));
268-
}
269-
outDirtyRecords.inc();
270+
dealError(record, ExceptionTrace.traceOriginalCause(e));
270271
}
271272

272273
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
@@ -275,6 +276,14 @@ protected void dealInsert(Row record) {
275276
outRecords.inc();
276277
}
277278

279+
private void dealError(Row record, String cause) {
280+
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
281+
LOG.error("Get dirty data: {}", record.toString());
282+
LOG.error("Error cause: " + cause);
283+
}
284+
outDirtyRecords.inc();
285+
}
286+
278287
private Put getPutByRow(Row record) {
279288
String rowKey = buildRowKey(record);
280289
if (StringUtils.isEmpty(rowKey)) {

0 commit comments

Comments
 (0)