Skip to content

Commit 286a796

Browse files
author
dapeng
committed
keyParititon function
1 parent a83e544 commit 286a796

18 files changed

Lines changed: 216 additions & 25 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,8 @@ public void setRuntimeContext(RuntimeContext runtimeContext) {
7777
this.runtimeContext = runtimeContext;
7878
}
7979

80+
public SerializationSchema<Row> getSerializationSchema() {
81+
return serializationSchema;
82+
}
83+
8084
}

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public abstract class AbstractKafkaProducerFactory {
5151
* @param partitioner
5252
* @return
5353
*/
54-
public abstract RichSinkFunction<Row> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<Row>> partitioner);
54+
public abstract RichSinkFunction<Row> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<Row>> partitioner, String[] partitionKeys);
5555

5656
protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation) {
5757
return new SerializationMetricWrapper(createSerializationSchema(kafkaSinkTableInfo, typeInformation));
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.dtstack.flink.sql.sink.kafka;
2+
3+
4+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
5+
6+
import java.util.Random;
7+
import org.apache.flink.util.Preconditions;
8+
9+
public class CustomerFlinkPartition<T> extends FlinkFixedPartitioner<T> {
10+
@Override
11+
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
12+
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
13+
if(key == null){
14+
Random random = new Random();
15+
return partitions[random.nextInt(1000) % partitions.length];
16+
}
17+
return partitions[key.hashCode() % partitions.length];
18+
}
19+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.dtstack.flink.sql.sink.kafka;
2+
3+
4+
import com.dtstack.flink.sql.format.SerializationMetricWrapper;
5+
import org.apache.flink.api.common.functions.RuntimeContext;
6+
import org.apache.flink.api.common.serialization.SerializationSchema;
7+
import org.apache.flink.formats.json.JsonRowSchemaConverter;
8+
import org.apache.flink.formats.json.JsonRowSerializationSchema;
9+
import org.apache.flink.metrics.Counter;
10+
import org.apache.flink.metrics.Meter;
11+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
12+
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
13+
import org.apache.flink.types.Row;
14+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema<Row> {
19+
20+
private Logger log = LoggerFactory.getLogger(getClass());
21+
22+
private static final long serialVersionUID = 1L;
23+
private final SerializationMetricWrapper serializationMetricWrapper;
24+
private String[] partitionKeys;
25+
private ObjectMapper mapper = null;
26+
27+
public CustomerKeyedSerializationSchema(SerializationMetricWrapper serializationMetricWrapper, String[] partitionKeys) {
28+
this.serializationMetricWrapper = serializationMetricWrapper;
29+
this.partitionKeys = partitionKeys;
30+
this.mapper = new ObjectMapper();
31+
}
32+
33+
public byte[] serializeKey(Row element) {
34+
if(partitionKeys == null || partitionKeys.length <=0){
35+
return null;
36+
}
37+
SerializationSchema<Row> serializationSchema = serializationMetricWrapper.getSerializationSchema();
38+
if(serializationSchema instanceof JsonRowSerializationSchema){
39+
return serializeJsonKey((JsonRowSerializationSchema) serializationSchema, element);
40+
}
41+
return null;
42+
}
43+
44+
public byte[] serializeValue(Row element) {
45+
return this.serializationMetricWrapper.serialize(element);
46+
}
47+
48+
public String getTargetTopic(Row element) {
49+
return null;
50+
}
51+
52+
private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationSchema, Row element) {
53+
try {
54+
byte[] data = jsonRowSerializationSchema.serialize(element);
55+
ObjectNode objectNode = mapper.readValue(data, ObjectNode.class);
56+
StringBuilder sb = new StringBuilder();
57+
for(String key : partitionKeys){
58+
if(objectNode.has(key)){
59+
sb.append(objectNode.get(key));
60+
}
61+
}
62+
return sb.toString().getBytes();
63+
}catch (Exception e){
64+
log.error("serializeJsonKey error", e);
65+
}
66+
return null;
67+
68+
}
69+
}

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4949
kafkaSinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
5050
kafkaSinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
5151

52+
kafkaSinkTableInfo.setEnableKeyPartition(MathUtil.getString(props.get(KafkaSinkTableInfo.ENABLE_KEY_PARTITION_KEY.toLowerCase())));
53+
kafkaSinkTableInfo.setPartitionKeys(MathUtil.getString(props.get(KafkaSinkTableInfo.PARTITION_KEY.toLowerCase())));
54+
5255
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
5356
kafkaSinkTableInfo.setParallelism(parallelism);
5457

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
4141

4242
public static final String TYPE_KEY = "type";
4343

44+
public static final String ENABLE_KEY_PARTITION_KEY = "enableKeyPartition";
45+
46+
public static final String PARTITION_KEY = "partitionKeys";
47+
4448
private String bootstrapServers;
4549

4650
public Map<String, String> kafkaParam = new HashMap<String, String>();
@@ -51,6 +55,10 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
5155

5256
private String fieldDelimiter;
5357

58+
private String enableKeyPartition;
59+
60+
private String partitionKeys;
61+
5462
public void addKafkaParam(String key, String value) {
5563
kafkaParam.put(key, value);
5664
}
@@ -104,4 +112,20 @@ public boolean check() {
104112
return false;
105113
}
106114

115+
public String getEnableKeyPartition() {
116+
return enableKeyPartition;
117+
}
118+
119+
public void setEnableKeyPartition(String enableKeyPartition) {
120+
this.enableKeyPartition = enableKeyPartition;
121+
}
122+
123+
public String getPartitionKeys() {
124+
return partitionKeys;
125+
}
126+
127+
public void setPartitionKeys(String partitionKeys) {
128+
this.partitionKeys = partitionKeys;
129+
}
130+
107131
}

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public class KafkaProducer extends FlinkKafkaProducer<Row> {
3939

4040
private SerializationMetricWrapper serializationMetricWrapper;
4141

42-
public KafkaProducer(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner) {
43-
super(topicId, serializationSchema, producerConfig, customPartitioner);
42+
public KafkaProducer(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner, String[] parititonKeys) {
43+
super(topicId, new CustomerKeyedSerializationSchema((SerializationMetricWrapper)serializationSchema, parititonKeys), producerConfig, customPartitioner);
4444
this.serializationMetricWrapper = (SerializationMetricWrapper) serializationSchema;
4545
}
4646

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
public class KafkaProducerFactory extends AbstractKafkaProducerFactory {
3737

3838
@Override
39-
public RichSinkFunction<Row> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<Row>> partitioner) {
40-
return new KafkaProducer(kafkaSinkTableInfo.getTopic(), createSerializationMetricWrapper(kafkaSinkTableInfo, typeInformation), properties, partitioner);
39+
public RichSinkFunction<Row> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<Row>> partitioner, String[] partitionKeys) {
40+
return new KafkaProducer(kafkaSinkTableInfo.getTopic(), createSerializationMetricWrapper(kafkaSinkTableInfo, typeInformation), properties, partitioner, partitionKeys);
4141
}
4242
}

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2222
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
24+
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.flink.api.common.typeinfo.TypeInformation;
2526
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -63,6 +64,8 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
6364
/** Partitioner to select Kafka partition for each item. */
6465
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
6566

67+
private String[] partitionKeys;
68+
6669
@Override
6770
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6871
KafkaSinkTableInfo kafkaSinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
@@ -74,7 +77,8 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
7477
for (String key : kafkaSinkTableInfo.getKafkaParamKeys()) {
7578
properties.setProperty(key, kafkaSinkTableInfo.getKafkaParam(key));
7679
}
77-
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
80+
this.partitioner = Optional.of(getFlinkPartitioner(kafkaSinkTableInfo));
81+
this.partitionKeys = getPartitionKeys(kafkaSinkTableInfo);
7882
this.fieldNames = kafkaSinkTableInfo.getFields();
7983
TypeInformation[] types = new TypeInformation[kafkaSinkTableInfo.getFields().length];
8084
for (int i = 0; i < kafkaSinkTableInfo.getFieldClasses().length; i++) {
@@ -93,7 +97,7 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9397
this.parallelism = parallelism;
9498
}
9599

96-
this.flinkKafkaProducer = (FlinkKafkaProducer<Row>) new KafkaProducerFactory().createKafkaProducer(kafkaSinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner);
100+
this.flinkKafkaProducer = (FlinkKafkaProducer<Row>) new KafkaProducerFactory().createKafkaProducer(kafkaSinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner, partitionKeys);
97101
return this;
98102
}
99103

@@ -133,4 +137,17 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform
133137
this.fieldTypes = fieldTypes;
134138
return this;
135139
}
140+
private FlinkKafkaPartitioner getFlinkPartitioner(KafkaSinkTableInfo kafkaSinkTableInfo){
141+
if("true".equalsIgnoreCase(kafkaSinkTableInfo.getEnableKeyPartition())){
142+
return new CustomerFlinkPartition<>();
143+
}
144+
return new FlinkFixedPartitioner<>();
145+
}
146+
147+
private String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo){
148+
if(StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())){
149+
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
150+
}
151+
return null;
152+
}
136153
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public class KafkaProducer09 extends FlinkKafkaProducer09<Row> {
3939

4040
private SerializationMetricWrapper serializationMetricWrapper;
4141

42-
public KafkaProducer09(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner) {
43-
super(topicId, serializationSchema, producerConfig, customPartitioner.orElse(null));
42+
public KafkaProducer09(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<Row>> customPartitioner,String[] partitionKeys) {
43+
super(topicId, new CustomerKeyedSerializationSchema((SerializationMetricWrapper)serializationSchema, partitionKeys), producerConfig, customPartitioner.orElse(null));
4444
this.serializationMetricWrapper = (SerializationMetricWrapper) serializationSchema;
4545
}
4646

0 commit comments

Comments
 (0)