2222import com .dtstack .flink .sql .sink .kafka .table .KafkaSinkTableInfo ;
2323import com .dtstack .flink .sql .table .TargetTableInfo ;
2424import org .apache .flink .api .common .serialization .SerializationSchema ;
25- import org .apache .flink .api .common .serialization .TypeInformationSerializationSchema ;
2625import org .apache .flink .api .common .typeinfo .TypeInformation ;
26+ import org .apache .flink .api .java .tuple .Tuple2 ;
2727import org .apache .flink .api .java .typeutils .RowTypeInfo ;
28- import org .apache .flink .formats . json . JsonRowSerializationSchema ;
28+ import org .apache .flink .api . java . typeutils . TupleTypeInfo ;
2929import org .apache .flink .streaming .api .datastream .DataStream ;
30-
31- import org .apache .flink .streaming .connectors .kafka .Kafka011TableSink ;
3230import org .apache .flink .streaming .connectors .kafka .KafkaTableSinkBase ;
3331import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkFixedPartitioner ;
3432import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkKafkaPartitioner ;
3533import org .apache .flink .table .api .TableSchema ;
36- import org .apache .flink .table .sinks .AppendStreamTableSink ;
34+ import org .apache .flink .table .sinks .RetractStreamTableSink ;
3735import org .apache .flink .table .sinks .TableSink ;
3836import org .apache .flink .types .Row ;
3937
5048 * @modifyer maqi
5149 *
5250 */
53- public class KafkaSink implements AppendStreamTableSink <Row >, IStreamSinkGener <KafkaSink > {
51+ public class KafkaSink implements RetractStreamTableSink <Row >, IStreamSinkGener <KafkaSink > {
5452
5553 protected String [] fieldNames ;
5654
5755 protected TypeInformation <?>[] fieldTypes ;
5856
59- /** The schema of the table. */
60- private TableSchema schema ;
61-
62- /** The Kafka topic to write to. */
6357 protected String topic ;
6458
65- /** Properties for the Kafka producer. */
6659 protected Properties properties ;
6760
6861 /** Serialization schema for encoding records to Kafka. */
6962 protected SerializationSchema serializationSchema ;
7063
64+ /** The schema of the table. */
65+ private TableSchema schema ;
66+
7167 /** Partitioner to select Kafka partition for each item. */
7268 protected Optional <FlinkKafkaPartitioner <Row >> partitioner ;
7369
70+
7471 @ Override
7572 public KafkaSink genStreamSink (TargetTableInfo targetTableInfo ) {
76- KafkaSinkTableInfo kafka011SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
77- this .topic = kafka011SinkTableInfo .getTopic ();
73+ KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
74+ this .topic = kafka11SinkTableInfo .getTopic ();
7875
79- Properties props = new Properties ();
80- props .setProperty ("bootstrap.servers" , kafka011SinkTableInfo .getBootstrapServers ());
76+ properties = new Properties ();
77+ properties .setProperty ("bootstrap.servers" , kafka11SinkTableInfo .getBootstrapServers ());
8178
82- for (String key : kafka011SinkTableInfo .getKafkaParamKeys ()) {
83- props .setProperty (key , kafka011SinkTableInfo .getKafkaParam (key ));
79+ for (String key : kafka11SinkTableInfo .getKafkaParamKeys ()) {
80+ properties .setProperty (key , kafka11SinkTableInfo .getKafkaParam (key ));
8481 }
85- this .properties = props ;
8682 this .partitioner = Optional .of (new FlinkFixedPartitioner <>());
87- this .fieldNames = kafka011SinkTableInfo .getFields ();
88- TypeInformation [] types = new TypeInformation [kafka011SinkTableInfo .getFields ().length ];
89- for (int i = 0 ; i < kafka011SinkTableInfo .getFieldClasses ().length ; i ++){
90- types [i ] = TypeInformation .of (kafka011SinkTableInfo .getFieldClasses ()[i ]);
83+ this .fieldNames = kafka11SinkTableInfo .getFields ();
84+ TypeInformation [] types = new TypeInformation [kafka11SinkTableInfo .getFields ().length ];
85+ for (int i = 0 ; i < kafka11SinkTableInfo .getFieldClasses ().length ; i ++) {
86+ types [i ] = TypeInformation .of (kafka11SinkTableInfo .getFieldClasses ()[i ]);
9187 }
9288 this .fieldTypes = types ;
9389
@@ -97,15 +93,17 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9793 }
9894 this .schema = schemaBuilder .build ();
9995
100- //this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
101- if ("json" .equalsIgnoreCase (kafka011SinkTableInfo .getSinkDataType ())) {
102- this .serializationSchema = new CustomerJsonRowSerializationSchema (getOutputType ());
103- }
96+ this .serializationSchema = new CustomerJsonRowSerializationSchema (getOutputType ().getTypeAt (1 ));
10497 return this ;
10598 }
10699
107100 @ Override
108- public void emitDataStream (DataStream <Row > dataStream ) {
101+ public TypeInformation <Row > getRecordType () {
102+ return new RowTypeInfo (fieldTypes , fieldNames );
103+ }
104+
105+ @ Override
106+ public void emitDataStream (DataStream <Tuple2 <Boolean , Row >> dataStream ) {
109107 KafkaTableSinkBase kafkaTableSink = new CustomerKafka11JsonTableSink (
110108 schema ,
111109 topic ,
@@ -114,12 +112,17 @@ public void emitDataStream(DataStream<Row> dataStream) {
114112 serializationSchema
115113 );
116114
117- kafkaTableSink .emitDataStream (dataStream );
115+
116+ DataStream <Row > ds = dataStream .map ((Tuple2 <Boolean , Row > record ) -> {
117+ return record .f1 ;
118+ }).returns (getOutputType ().getTypeAt (1 ));
119+
120+ kafkaTableSink .emitDataStream (ds );
118121 }
119122
120123 @ Override
121- public TypeInformation < Row > getOutputType () {
122- return new RowTypeInfo (fieldTypes , fieldNames );
124+ public TupleTypeInfo < Tuple2 < Boolean , Row > > getOutputType () {
125+ return new TupleTypeInfo ( org . apache . flink . table . api . Types . BOOLEAN (), new RowTypeInfo (fieldTypes , fieldNames ) );
123126 }
124127
125128 @ Override
@@ -133,10 +136,9 @@ public TypeInformation<?>[] getFieldTypes() {
133136 }
134137
135138 @ Override
136- public TableSink <Row > configure (String [] fieldNames , TypeInformation <?>[] fieldTypes ) {
139+ public TableSink <Tuple2 < Boolean , Row > > configure (String [] fieldNames , TypeInformation <?>[] fieldTypes ) {
137140 this .fieldNames = fieldNames ;
138141 this .fieldTypes = fieldTypes ;
139142 return this ;
140143 }
141-
142- }
144+ }
0 commit comments