2626import org .apache .flink .api .java .typeutils .RowTypeInfo ;
2727import org .apache .flink .streaming .api .datastream .DataStream ;
2828import org .apache .flink .streaming .connectors .kafka .KafkaTableSinkBase ;
29+ import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkFixedPartitioner ;
30+ import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkKafkaPartitioner ;
31+ import org .apache .flink .table .api .TableSchema ;
2932import org .apache .flink .table .sinks .AppendStreamTableSink ;
3033import org .apache .flink .table .sinks .TableSink ;
3134import org .apache .flink .types .Row ;
3235
36+ import java .util .Optional ;
3337import java .util .Properties ;
3438/**
3539 *
@@ -48,39 +52,60 @@ public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<K
4852
4953 protected TypeInformation <?>[] fieldTypes ;
5054
55+ /** The schema of the table. */
56+ private TableSchema schema ;
57+
58+ /** The Kafka topic to write to. */
5159 protected String topic ;
5260
61+ /** Properties for the Kafka producer. */
5362 protected Properties properties ;
5463
5564 /** Serialization schema for encoding records to Kafka. */
5665 protected SerializationSchema serializationSchema ;
5766
67+ /** Partitioner to select Kafka partition for each item. */
68+ protected Optional <FlinkKafkaPartitioner <Row >> partitioner ;
5869 @ Override
5970 public KafkaSink genStreamSink (TargetTableInfo targetTableInfo ) {
60- KafkaSinkTableInfo kafka10SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
61- this .topic = kafka10SinkTableInfo .getTopic ();
62- this .fieldNames = kafka10SinkTableInfo .getFields ();
63- TypeInformation [] types = new TypeInformation [kafka10SinkTableInfo .getFields ().length ];
64- for (int i = 0 ; i < kafka10SinkTableInfo .getFieldClasses ().length ; i ++) {
65- types [i ] = TypeInformation .of (kafka10SinkTableInfo .getFieldClasses ()[i ]);
71+ KafkaSinkTableInfo kafka010SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
72+ this .topic = kafka010SinkTableInfo .getTopic ();
73+
74+ Properties props = new Properties ();
75+ props .setProperty ("bootstrap.servers" , kafka010SinkTableInfo .getBootstrapServers ());
76+
77+ for (String key :kafka010SinkTableInfo .getKafkaParamKeys ()) {
78+ props .setProperty (key , kafka010SinkTableInfo .getKafkaParam (key ));
79+ }
80+ this .properties = props ;
81+ this .partitioner = Optional .of (new FlinkFixedPartitioner <>());
82+ this .fieldNames = kafka010SinkTableInfo .getFields ();
83+ TypeInformation [] types = new TypeInformation [kafka010SinkTableInfo .getFields ().length ];
84+ for (int i = 0 ; i < kafka010SinkTableInfo .getFieldClasses ().length ; i ++){
85+ types [i ] = TypeInformation .of (kafka010SinkTableInfo .getFieldClasses ()[i ]);
6686 }
6787 this .fieldTypes = types ;
6888
69- properties = new Properties ();
70- for (String key : kafka10SinkTableInfo . getKafkaParamKeys ()) {
71- properties . setProperty ( key , kafka10SinkTableInfo . getKafkaParam ( key ) );
89+ TableSchema . Builder schemaBuilder = TableSchema . builder ();
90+ for (int i = 0 ; i < fieldNames . length ; i ++) {
91+ schemaBuilder . field ( fieldNames [ i ], fieldTypes [ i ] );
7292 }
73- properties . setProperty ( "bootstrap.servers" , kafka10SinkTableInfo . getBootstrapServers () );
93+ this . schema = schemaBuilder . build ( );
7494
75- this .serializationSchema = new CustomerJsonRowSerializationSchema (getOutputType ());
95+ //this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
96+ if ("json" .equalsIgnoreCase (kafka010SinkTableInfo .getSinkDataType ())) {
97+ this .serializationSchema = new CustomerJsonRowSerializationSchema (getOutputType ());
98+ }
7699 return this ;
77100 }
78101
79102 @ Override
80103 public void emitDataStream (DataStream <Row > dataStream ) {
81104 KafkaTableSinkBase kafkaTableSink = new CustomerKafka10JsonTableSink (
105+ schema ,
82106 topic ,
83107 properties ,
108+ partitioner ,
84109 serializationSchema
85110 );
86111
0 commit comments