1919
2020import com .dtstack .flink .sql .format .FormatType ;
2121import com .dtstack .flink .sql .format .SerializationMetricWrapper ;
22+ import com .dtstack .flink .sql .sink .kafka .serialization .AvroTuple2SerializationSchema ;
23+ import com .dtstack .flink .sql .sink .kafka .serialization .CsvTupleSerializationSchema ;
24+ import com .dtstack .flink .sql .sink .kafka .serialization .JsonTupleSerializationSchema ;
2225import com .dtstack .flink .sql .sink .kafka .table .KafkaSinkTableInfo ;
2326import org .apache .commons .lang3 .StringUtils ;
2427import org .apache .flink .api .common .serialization .SerializationSchema ;
2528import org .apache .flink .api .common .typeinfo .TypeInformation ;
26- import org .apache .flink .formats .avro .AvroRowSerializationSchema ;
27- import org .apache .flink .formats .csv .CsvRowSerializationSchema ;
28- import org .apache .flink .formats .json .JsonRowSerializationSchema ;
29+ import org .apache .flink .api .java .tuple .Tuple2 ;
2930import org .apache .flink .streaming .api .functions .sink .RichSinkFunction ;
3031import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkKafkaPartitioner ;
3132import org .apache .flink .types .Row ;
@@ -51,42 +52,38 @@ public abstract class AbstractKafkaProducerFactory {
5152 * @param partitioner
5253 * @return
5354 */
54- public abstract RichSinkFunction <Row > createKafkaProducer (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Row > typeInformation , Properties properties , Optional <FlinkKafkaPartitioner <Row >> partitioner , String [] partitionKeys );
55+ public abstract RichSinkFunction <Tuple2 <Boolean ,Row >> createKafkaProducer (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 <Boolean ,Row >> typeInformation ,
56+ Properties properties , Optional <FlinkKafkaPartitioner <Tuple2 <Boolean ,Row >>> partitioner , String [] partitionKeys );
5557
56- protected SerializationMetricWrapper createSerializationMetricWrapper (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Row > typeInformation ) {
57- return new SerializationMetricWrapper (createSerializationSchema (kafkaSinkTableInfo , typeInformation ));
58+ protected SerializationMetricWrapper createSerializationMetricWrapper (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 <Boolean ,Row >> typeInformation ) {
59+ SerializationSchema <Tuple2 <Boolean ,Row >> serializationSchema = createSerializationSchema (kafkaSinkTableInfo , typeInformation );
60+ return new SerializationMetricWrapper (serializationSchema );
5861 }
5962
60- private SerializationSchema <Row > createSerializationSchema (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Row > typeInformation ) {
61- SerializationSchema <Row > serializationSchema = null ;
63+ private SerializationSchema <Tuple2 < Boolean , Row >> createSerializationSchema (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 < Boolean , Row > > typeInformation ) {
64+ SerializationSchema <Tuple2 < Boolean , Row > > serializationSchema = null ;
6265 if (FormatType .JSON .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
63-
6466 if (StringUtils .isNotBlank (kafkaSinkTableInfo .getSchemaString ())) {
65- serializationSchema = new JsonRowSerializationSchema . Builder (kafkaSinkTableInfo .getSchemaString ()). build ( );
67+ serializationSchema = new JsonTupleSerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo . getUpdateMode () );
6668 } else if (typeInformation != null && typeInformation .getArity () != 0 ) {
67- serializationSchema = new JsonRowSerializationSchema . Builder (typeInformation ). build ( );
69+ serializationSchema = new JsonTupleSerializationSchema (typeInformation , kafkaSinkTableInfo . getUpdateMode () );
6870 } else {
6971 throw new IllegalArgumentException ("sinkDataType:" + FormatType .JSON .name () + " must set schemaString(JSON Schema)or TypeInformation<Row>" );
7072 }
71-
7273 } else if (FormatType .CSV .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
73-
7474 if (StringUtils .isBlank (kafkaSinkTableInfo .getFieldDelimiter ())) {
7575 throw new IllegalArgumentException ("sinkDataType:" + FormatType .CSV .name () + " must set fieldDelimiter" );
7676 }
77-
78- final CsvRowSerializationSchema .Builder serSchemaBuilder = new CsvRowSerializationSchema .Builder (typeInformation );
77+ final CsvTupleSerializationSchema .Builder serSchemaBuilder = new CsvTupleSerializationSchema .Builder (typeInformation );
7978 serSchemaBuilder .setFieldDelimiter (kafkaSinkTableInfo .getFieldDelimiter ().toCharArray ()[0 ]);
80- serializationSchema = serSchemaBuilder .build ( );
79+ serSchemaBuilder .setUpdateMode ( kafkaSinkTableInfo . getUpdateMode () );
8180
81+ serializationSchema = serSchemaBuilder .build ();
8282 } else if (FormatType .AVRO .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
83-
8483 if (StringUtils .isBlank (kafkaSinkTableInfo .getSchemaString ())) {
8584 throw new IllegalArgumentException ("sinkDataType:" + FormatType .AVRO .name () + " must set schemaString" );
8685 }
87-
88- serializationSchema = new AvroRowSerializationSchema (kafkaSinkTableInfo .getSchemaString ());
89-
86+ serializationSchema = new AvroTuple2SerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
9087 }
9188
9289 if (null == serializationSchema ) {
0 commit comments