Skip to content

Commit 7c2555f

Browse files
committed
[hotfix-36143][hbase]修复hbase 写入数据全为null时,table put 异常
1 parent b1eb882 commit 7c2555f

2 files changed

Lines changed: 19 additions & 4 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,12 @@ public static String buildDefaultDirty() {
140140
* 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
141141
*/
142142
public void close() {
143-
if (checkConsumer()) {
143+
if (consumer != null && checkConsumer()) {
144144
LOG.info("dirty consumer is closing ...");
145145
consumer.close();
146+
}
147+
148+
if (dirtyDataConsumer != null) {
146149
dirtyDataConsumer.shutdownNow();
147150
}
148151
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.LinkedList;
5050
import java.util.List;
5151
import java.util.Map;
52+
import java.util.Objects;
5253
import java.util.concurrent.ScheduledExecutorService;
5354
import java.util.concurrent.ScheduledFuture;
5455
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -223,12 +224,23 @@ public void writeBatchRecord(Row row) {
223224

224225
protected synchronized void dealBatchOperation(List<Row> records) {
225226
// A null in the result array means that the call for that action failed, even after retries.
226-
Object[] results = new Object[records.size()];
227+
Object[] results = null;
227228
try {
228229
List<Put> puts = new ArrayList<>();
229230
for (Row record : records) {
230-
puts.add(getPutByRow(record));
231+
Put put = getPutByRow(record);
232+
if (put == null || put.isEmpty()) {
233+
dirtyDataManager.execute();
234+
dirtyDataManager.collectDirtyData(
235+
record.toString(),
236+
"HBase put is empty, check record please!"
237+
);
238+
outDirtyRecords.inc();
239+
} else {
240+
puts.add(put);
241+
}
231242
}
243+
results = new Object[puts.size()];
232244
table.batch(puts, results);
233245

234246
// 打印结果
@@ -240,7 +252,7 @@ protected synchronized void dealBatchOperation(List<Row> records) {
240252
// ignore exception
241253
} finally {
242254
// 判断数据是否插入成功
243-
for (int i = 0; i < results.length; i++) {
255+
for (int i = 0; i < Objects.requireNonNull(results).length; i++) {
244256
if (results[i] instanceof Exception) {
245257
dirtyDataManager.execute();
246258
// 脏数据记录

0 commit comments

Comments
 (0)