Skip to content

Commit a8f4afa

Browse files
committed
[fix] sink add parallelism
1 parent 5b1a824 commit a8f4afa

2 files changed

Lines changed: 5 additions & 3 deletions

File tree

  • kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table
  • kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5757
kafkaSinkTableInfo.setPartitionKeys(MathUtil.getString(props.get(KafkaSinkTableInfo.PARTITION_KEY.toLowerCase())));
5858
kafkaSinkTableInfo.setUpdateMode(MathUtil.getString(props.getOrDefault(KafkaSinkTableInfo.UPDATE_KEY.toLowerCase(), EUpdateMode.APPEND.name())));
5959

60-
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
60+
Integer parallelism = MathUtil.getIntegerVal(props.getOrDefault(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase(), 1));
6161
kafkaSinkTableInfo.setParallelism(parallelism);
6262

6363
for (String key : props.keySet()) {

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.flink.types.Row;
1818

1919
import java.io.Serializable;
20+
import java.util.Objects;
2021

2122
public class KuduSink implements RetractStreamTableSink<Row>, Serializable, IStreamSinkGener<KuduSink> {
2223

@@ -36,7 +37,7 @@ public class KuduSink implements RetractStreamTableSink<Row>, Serializable, IStr
3637

3738
private Integer defaultSocketReadTimeoutMs;
3839

39-
private int parallelism = -1;
40+
private int parallelism = 1;
4041

4142
private String principal;
4243
private String keytab;
@@ -56,7 +57,8 @@ public KuduSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
5657
this.keytab = kuduTableInfo.getKeytab();
5758
this.krb5conf = kuduTableInfo.getKrb5conf();
5859
this.enableKrb = kuduTableInfo.isEnableKrb();
59-
this.parallelism = kuduTableInfo.getParallelism();
60+
this.parallelism = Objects.isNull(kuduTableInfo.getParallelism()) ?
61+
parallelism : kuduTableInfo.getParallelism();
6062

6163
return this;
6264
}

0 commit comments

Comments
 (0)