Skip to content

Commit 978a837

Browse files
committed
[feat] impala支持update mode
1 parent abac28c commit 978a837

2 files changed

Lines changed: 43 additions & 10 deletions

File tree

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.util.JDBCUtils;
2626
import com.dtstack.flink.sql.util.KrbUtils;
2727
import com.google.common.collect.Maps;
28+
import org.apache.commons.collections.CollectionUtils;
2829
import org.apache.flink.api.java.tuple.Tuple2;
2930
import org.apache.flink.configuration.Configuration;
3031
import org.apache.flink.types.Row;
@@ -40,6 +41,7 @@
4041
import java.sql.PreparedStatement;
4142
import java.sql.SQLException;
4243
import java.util.ArrayList;
44+
import java.util.Arrays;
4345
import java.util.List;
4446
import java.util.Map;
4547
import java.util.Objects;
@@ -107,13 +109,13 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
107109
public List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfoList;
108110

109111
// partition field of static partition which matched by ${field}
110-
private List<String> staticPartitionField = new ArrayList<>();
112+
private final List<String> staticPartitionField = new ArrayList<>();
111113

112114
// static partition sql like 'INSERT INTO tableName(field1, field2) PARTITION(pt=xx) VALUES(?, ?)'
113-
private String staticPartitionSql;
115+
private String staticPartitionSql = "";
114116

115117
private transient ScheduledExecutorService scheduler;
116-
private transient ScheduledFuture scheduledFuture;
118+
private transient ScheduledFuture<?> scheduledFuture;
117119

118120
@Override
119121
public void configure(Configuration parameters) {
@@ -170,6 +172,15 @@ private void openJdbc() {
170172
try {
171173
connection = DriverManager.getConnection(dbUrl, userName, password);
172174
connection.setAutoCommit(false);
175+
176+
//update mode
177+
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
178+
statement = connection.prepareStatement(
179+
buildUpdateSql(schema, tableName, fieldList, primaryKeys)
180+
);
181+
return;
182+
}
183+
173184
// kudu
174185
if (storeType.equalsIgnoreCase(KUDU_TYPE)) {
175186
statement = connection.prepareStatement(
@@ -235,7 +246,7 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
235246
}
236247

237248
// build static partition statement from row data
238-
if (!staticPartitionSql.isEmpty()) {
249+
if (Objects.isNull(statement) || !staticPartitionSql.isEmpty()) {
239250
statement = connection.prepareStatement(
240251
staticPartitionSql.replace(PARTITION_CONDITION,
241252
buildStaticPartitionCondition(valueMap, staticPartitionField))
@@ -247,7 +258,12 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
247258
rowValue.setField(i, copyRow.getField(i));
248259
}
249260

250-
setRowToStatement(statement, fieldTypeList, rowValue);
261+
if (updateMode.equalsIgnoreCase(UPDATE_MODE)) {
262+
setRowToStatement(statement, fieldTypeList, rowValue, primaryKeys.stream().mapToInt(fieldList::indexOf).toArray());
263+
} else {
264+
setRowToStatement(statement, fieldTypeList, rowValue);
265+
}
266+
251267
statement.addBatch();
252268

253269
if (batchCount.incrementAndGet() > batchSize) {
@@ -262,6 +278,10 @@ private void setRowToStatement(PreparedStatement statement, List<String> fieldTy
262278
JDBCTypeConvertUtils.setRecordToStatement(statement, JDBCTypeConvertUtils.getSqlTypeFromFieldType(fieldTypeList), row);
263279
}
264280

281+
private void setRowToStatement(PreparedStatement statement, List<String> fieldTypeList, Row row, int[] pkFields) throws SQLException {
282+
JDBCTypeConvertUtils.setRecordToStatement(statement, JDBCTypeConvertUtils.getSqlTypeFromFieldType(fieldTypeList), row, pkFields);
283+
}
284+
265285
@Override
266286
public void close() throws IOException {
267287
if (closed) {
@@ -402,8 +422,19 @@ private String buildDynamicInsertSql(String schema, String tableName, List<Strin
402422
*
403423
* @return UPDATE tableName SET setCondition WHERE whereCondition
404424
*/
405-
private String buildUpdateSql(String schema, String tableName, String[] fieldNames, String[] partitionFields, Map<String, Object> replaceField) {
406-
return "update tableName set name='王五' where id='003";
425+
private String buildUpdateSql(String schema, String tableName, List<String> fieldNames, List<String> primaryKeys) {
426+
//跳过primary key字段
427+
String setClause = fieldNames.stream()
428+
.filter(f -> !CollectionUtils.isNotEmpty(primaryKeys) || !primaryKeys.contains(f))
429+
.map(f -> quoteIdentifier(f) + "=?")
430+
.collect(Collectors.joining(", "));
431+
432+
String conditionClause = primaryKeys.stream()
433+
.map(f -> quoteIdentifier(f) + "=?")
434+
.collect(Collectors.joining(" AND "));
435+
436+
return "UPDATE " + (Objects.isNull(schema) ? "" : quoteIdentifier(schema) + ".")
437+
+ quoteIdentifier(tableName) + " SET " + setClause + " WHERE " + conditionClause;
407438
}
408439

409440
public String quoteIdentifier(String identifier) {

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/table/ImpalaTableInfo.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.common.base.Preconditions;
2424
import org.apache.commons.lang3.StringUtils;
2525

26+
import java.util.Objects;
27+
2628
/**
2729
* Date: 2020/10/14
2830
* Company: www.dtstack.com
@@ -263,15 +265,15 @@ public boolean check() {
263265
Preconditions.checkNotNull(url, "impala field of URL is required");
264266
Preconditions.checkNotNull(tableName, "impala field of tableName is required");
265267

266-
if (null != batchSize) {
268+
if (Objects.nonNull(batchSize)) {
267269
Preconditions.checkArgument(batchSize <= MAX_BATCH_SIZE, "batchSize must be less than " + MAX_BATCH_SIZE);
268270
}
269271

270272
if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) {
271-
Preconditions.checkArgument(null != getPrimaryKeys() && getPrimaryKeys().size() > 0, "updateMode mode primary is required");
273+
Preconditions.checkArgument(Objects.nonNull(getPrimaryKeys()) && getPrimaryKeys().size() > 0, "updateMode mode primary is required");
272274
}
273275

274-
if (null != getPrimaryKeys()) {
276+
if (Objects.nonNull(getPrimaryKeys())) {
275277
getPrimaryKeys().forEach(pk -> {
276278
Preconditions.checkArgument(getFieldList().contains(pk), "primary key " + pk + " not found in sink table field");
277279
});

0 commit comments

Comments
 (0)