Skip to content

Commit c7747d7

Browse files
committed
support kafka parameter
1 parent 873884e commit c7747d7

9 files changed

Lines changed: 72 additions & 8 deletions

File tree

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6464
this.fieldTypes = types;
6565

6666
properties = new Properties();
67+
for (String key:kafka09SinkTableInfo.getKafkaParamKeys()) {
68+
properties.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
69+
}
6770
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
6871

6972
this.serializationSchema = new JsonRowSerializationSchema(getOutputType());

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4242

4343
kafka09SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4444
kafka09SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
for (String key:props.keySet()) {
46+
if (!key.isEmpty() && key.startsWith("kafka.")) {
47+
kafka09SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
48+
}
49+
}
4550
kafka09SinkTableInfo.check();
4651
return kafka09SinkTableInfo;
4752
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,23 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
4444

4545
private String topic;
4646

47+
public Map<String,String> kafkaParam = new HashMap<String,String>();
48+
4749
public KafkaSinkTableInfo() {
4850
super.setType(CURR_TYPE);
4951
}
5052

53+
public void addKafkaParam(String key,String value){
54+
kafkaParam.put(key,value);
55+
}
56+
57+
public String getKafkaParam(String key){
58+
return kafkaParam.get(key);
59+
}
60+
61+
public Set<String> getKafkaParamKeys(){
62+
return kafkaParam.keySet();
63+
}
5164

5265
public String getBootstrapServers() {
5366
return bootstrapServers;

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6969
this.fieldTypes = types;
7070

7171
properties = new Properties();
72+
for (String key:kafka10SinkTableInfo.getKafkaParamKeys()) {
73+
properties.setProperty(key, kafka10SinkTableInfo.getKafkaParam(key));
74+
}
7275
properties.setProperty("bootstrap.servers", kafka10SinkTableInfo.getBootstrapServers());
7376

7477
this.serializationSchema = new JsonRowSerializationSchema(getOutputType());

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4242

4343
kafka10SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4444
kafka10SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
for (String key:props.keySet()) {
46+
if (!key.isEmpty() && key.startsWith("kafka.")) {
47+
kafka10SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
48+
}
49+
}
4550
kafka10SinkTableInfo.check();
4651
return kafka10SinkTableInfo;
4752
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,24 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
4444

4545
private String bootstrapServers;
4646

47+
public Map<String,String> kafkaParam = new HashMap<String,String>();
48+
4749
private String topic;
4850

4951
public KafkaSinkTableInfo() {
5052
super.setType(CURR_TYPE);
5153
}
54+
public void addKafkaParam(String key,String value){
55+
kafkaParam.put(key,value);
56+
}
57+
58+
public String getKafkaParam(String key){
59+
return kafkaParam.get(key);
60+
}
61+
62+
public Set<String> getKafkaParamKeys(){
63+
return kafkaParam.keySet();
64+
}
5265

5366

5467
public String getBootstrapServers() {

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,20 @@ public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<
5757

5858
@Override
5959
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
60-
KafkaSinkTableInfo kafka10SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
61-
this.topic = kafka10SinkTableInfo.getTopic();
62-
this.fieldNames = kafka10SinkTableInfo.getFields();
63-
TypeInformation[] types = new TypeInformation[kafka10SinkTableInfo.getFields().length];
64-
for (int i = 0; i < kafka10SinkTableInfo.getFieldClasses().length; i++) {
65-
types[i] = TypeInformation.of(kafka10SinkTableInfo.getFieldClasses()[i]);
60+
KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
61+
this.topic = kafka11SinkTableInfo.getTopic();
62+
this.fieldNames = kafka11SinkTableInfo.getFields();
63+
TypeInformation[] types = new TypeInformation[kafka11SinkTableInfo.getFields().length];
64+
for (int i = 0; i < kafka11SinkTableInfo.getFieldClasses().length; i++) {
65+
types[i] = TypeInformation.of(kafka11SinkTableInfo.getFieldClasses()[i]);
6666
}
6767
this.fieldTypes = types;
6868

6969
properties = new Properties();
70-
properties.setProperty("bootstrap.servers", kafka10SinkTableInfo.getBootstrapServers());
71-
70+
for (String key:kafka11SinkTableInfo.getKafkaParamKeys()) {
71+
properties.setProperty(key, kafka11SinkTableInfo.getKafkaParam(key));
72+
}
73+
properties.setProperty("bootstrap.servers", kafka11SinkTableInfo.getBootstrapServers());
7274
this.serializationSchema = new JsonRowSerializationSchema(getOutputType());
7375
return this;
7476
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4242

4343
kafka11SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
4444
kafka11SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
for (String key:props.keySet()) {
46+
if (!key.isEmpty() && key.startsWith("kafka.")) {
47+
kafka11SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
48+
}
49+
}
4550
kafka11SinkTableInfo.check();
4651
return kafka11SinkTableInfo;
4752
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,25 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
4444

4545
private String topic;
4646

47+
public Map<String,String> kafkaParam = new HashMap<String,String>();
48+
4749
public KafkaSinkTableInfo() {
4850
super.setType(CURR_TYPE);
4951
}
5052

53+
public void addKafkaParam(String key,String value){
54+
kafkaParam.put(key,value);
55+
}
56+
57+
public String getKafkaParam(String key){
58+
return kafkaParam.get(key);
59+
}
60+
61+
public Set<String> getKafkaParamKeys(){
62+
return kafkaParam.keySet();
63+
}
64+
65+
5166

5267
public String getBootstrapServers() {
5368
return bootstrapServers;

0 commit comments

Comments
 (0)