2222import com .dtstack .flink .sql .sink .kafka .table .KafkaSinkTableInfo ;
2323import com .dtstack .flink .sql .table .TargetTableInfo ;
2424import org .apache .flink .api .common .serialization .SerializationSchema ;
25- import org .apache .flink .api .common .serialization .TypeInformationSerializationSchema ;
2625import org .apache .flink .api .common .typeinfo .TypeInformation ;
26+ import org .apache .flink .api .java .tuple .Tuple2 ;
2727import org .apache .flink .api .java .typeutils .RowTypeInfo ;
28- import org .apache .flink .formats . json . JsonRowSerializationSchema ;
28+ import org .apache .flink .api . java . typeutils . TupleTypeInfo ;
2929import org .apache .flink .streaming .api .datastream .DataStream ;
30-
31- import org .apache .flink .streaming .connectors .kafka .Kafka011TableSink ;
3230import org .apache .flink .streaming .connectors .kafka .KafkaTableSinkBase ;
3331import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkFixedPartitioner ;
3432import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkKafkaPartitioner ;
3533import org .apache .flink .table .api .TableSchema ;
36- import org .apache .flink .table .sinks .AppendStreamTableSink ;
34+ import org .apache .flink .table .sinks .RetractStreamTableSink ;
3735import org .apache .flink .table .sinks .TableSink ;
3836import org .apache .flink .types .Row ;
3937
5048 * @modifyer maqi
5149 *
5250 */
53- public class KafkaSink implements AppendStreamTableSink <Row >, IStreamSinkGener <KafkaSink > {
51+ public class KafkaSink implements RetractStreamTableSink <Row >, IStreamSinkGener <KafkaSink > {
5452
5553 protected String [] fieldNames ;
5654
5755 protected TypeInformation <?>[] fieldTypes ;
5856
59- /** The schema of the table. */
60- private TableSchema schema ;
61-
62- /** The Kafka topic to write to. */
6357 protected String topic ;
6458
65- /** Properties for the Kafka producer. */
6659 protected Properties properties ;
6760
6861 /** Serialization schema for encoding records to Kafka. */
6962 protected SerializationSchema serializationSchema ;
7063
64+ /** The schema of the table. */
65+ private TableSchema schema ;
66+
7167 /** Partitioner to select Kafka partition for each item. */
7268 protected Optional <FlinkKafkaPartitioner <Row >> partitioner ;
7369
70+
7471 @ Override
7572 public KafkaSink genStreamSink (TargetTableInfo targetTableInfo ) {
76- KafkaSinkTableInfo kafka011SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
77- this .topic = kafka011SinkTableInfo .getTopic ();
73+ KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
74+ this .topic = kafka11SinkTableInfo .getTopic ();
7875
79- Properties props = new Properties ();
80- props .setProperty ("bootstrap.servers" , kafka011SinkTableInfo .getBootstrapServers ());
76+ properties = new Properties ();
77+ properties .setProperty ("bootstrap.servers" , kafka11SinkTableInfo .getBootstrapServers ());
8178
82- for (String key : kafka011SinkTableInfo .getKafkaParamKeys ()) {
83- props .setProperty (key , kafka011SinkTableInfo .getKafkaParam (key ));
79+ for (String key : kafka11SinkTableInfo .getKafkaParamKeys ()) {
80+ properties .setProperty (key , kafka11SinkTableInfo .getKafkaParam (key ));
8481 }
85- this .properties = props ;
8682 this .partitioner = Optional .of (new FlinkFixedPartitioner <>());
87- this .fieldNames = kafka011SinkTableInfo .getFields ();
88- TypeInformation [] types = new TypeInformation [kafka011SinkTableInfo .getFields ().length ];
89- for (int i = 0 ; i < kafka011SinkTableInfo .getFieldClasses ().length ; i ++){
90- types [i ] = TypeInformation .of (kafka011SinkTableInfo .getFieldClasses ()[i ]);
83+ this .fieldNames = kafka11SinkTableInfo .getFields ();
84+ TypeInformation [] types = new TypeInformation [kafka11SinkTableInfo .getFields ().length ];
85+ for (int i = 0 ; i < kafka11SinkTableInfo .getFieldClasses ().length ; i ++) {
86+ types [i ] = TypeInformation .of (kafka11SinkTableInfo .getFieldClasses ()[i ]);
9187 }
9288 this .fieldTypes = types ;
9389
@@ -96,30 +92,34 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9692 schemaBuilder .field (fieldNames [i ], fieldTypes [i ]);
9793 }
9894 this .schema = schemaBuilder .build ();
99-
100- //this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
101- if ("json" .equalsIgnoreCase (kafka011SinkTableInfo .getSinkDataType ())) {
102- this .serializationSchema = new CustomerJsonRowSerializationSchema (getOutputType ());
103- }
95+ this .serializationSchema = new CustomerJsonRowSerializationSchema (getOutputType ().getTypeAt (1 ));
10496 return this ;
10597 }
10698
10799 @ Override
108- public void emitDataStream (DataStream <Row > dataStream ) {
100+ public TypeInformation <Row > getRecordType () {
101+ return new RowTypeInfo (fieldTypes , fieldNames );
102+ }
103+
104+ @ Override
105+ public void emitDataStream (DataStream <Tuple2 <Boolean , Row >> dataStream ) {
109106 KafkaTableSinkBase kafkaTableSink = new CustomerKafka11JsonTableSink (
110107 schema ,
111108 topic ,
112109 properties ,
113110 partitioner ,
114111 serializationSchema
115112 );
113+ DataStream <Row > ds = dataStream .map ((Tuple2 <Boolean , Row > record ) -> {
114+ return record .f1 ;
115+ }).returns (getOutputType ().getTypeAt (1 ));
116116
117- kafkaTableSink .emitDataStream (dataStream );
117+ kafkaTableSink .emitDataStream (ds );
118118 }
119119
120120 @ Override
121- public TypeInformation < Row > getOutputType () {
122- return new RowTypeInfo (fieldTypes , fieldNames );
121+ public TupleTypeInfo < Tuple2 < Boolean , Row > > getOutputType () {
122+ return new TupleTypeInfo ( org . apache . flink . table . api . Types . BOOLEAN (), new RowTypeInfo (fieldTypes , fieldNames ) );
123123 }
124124
125125 @ Override
@@ -133,10 +133,9 @@ public TypeInformation<?>[] getFieldTypes() {
133133 }
134134
135135 @ Override
136- public TableSink <Row > configure (String [] fieldNames , TypeInformation <?>[] fieldTypes ) {
136+ public TableSink <Tuple2 < Boolean , Row >> configure (String [] fieldNames , TypeInformation <?>[] fieldTypes ) {
137137 this .fieldNames = fieldNames ;
138138 this .fieldTypes = fieldTypes ;
139139 return this ;
140140 }
141-
142- }
141+ }
0 commit comments