Skip to content

Commit fe961ec

Browse files
committed
add kafkaSink
1 parent 6144eae commit fe961ec

7 files changed

Lines changed: 265 additions & 3 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/sink/StreamSinkFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir)
5151
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
5252

5353
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir);
54-
5554
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
56-
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
55+
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
56+
String className = PluginUtil.getSqlParserClassName(typeNoVersion, CURR_TYPE);
5757
Class<?> targetParser = dtClassLoader.loadClass(className);
5858

5959
if(!AbsTableParser.class.isAssignableFrom(targetParser)){
@@ -77,7 +77,8 @@ public static TableSink getTableSink(TargetTableInfo targetTableInfo, String loc
7777

7878
PluginUtil.addPluginJar(pluginJarDirPath, dtClassLoader);
7979

80-
String className = PluginUtil.getGenerClassName(pluginType, CURR_TYPE);
80+
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
81+
String className = PluginUtil.getGenerClassName(typeNoVersion, CURR_TYPE);
8182
Class<?> sinkClass = dtClassLoader.loadClass(className);
8283

8384
if(!IStreamSinkGener.class.isAssignableFrom(sinkClass)){
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.sink.kafka;
19+
20+
import org.apache.flink.api.common.serialization.SerializationSchema;
21+
import org.apache.flink.streaming.api.datastream.DataStream;
22+
import org.apache.flink.streaming.connectors.kafka.*;
23+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
24+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
25+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
26+
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
27+
import org.apache.flink.table.util.TableConnectorUtil;
28+
import org.apache.flink.types.Row;
29+
30+
import java.util.Properties;
31+
32+
/**
33+
* Reason: add schema info
34+
* Date: 2019/4/8
35+
* Company: www.dtstack.com
36+
*
37+
* @author maqi
38+
*/
39+
public class CustomerKafka09JsonTableSink extends KafkaJsonTableSink {
40+
41+
42+
protected SerializationSchema schema;
43+
44+
public CustomerKafka09JsonTableSink(String topic, Properties properties, SerializationSchema schema) {
45+
super(topic, properties, new FlinkFixedPartitioner<>());
46+
this.schema = schema;
47+
}
48+
49+
public CustomerKafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, SerializationSchema schema) {
50+
super(topic, properties, partitioner);
51+
this.schema = schema;
52+
}
53+
54+
55+
@Deprecated
56+
public CustomerKafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, SerializationSchema schema) {
57+
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
58+
this.schema = schema;
59+
}
60+
61+
@Override
62+
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
63+
return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
64+
}
65+
66+
@Override
67+
protected Kafka09JsonTableSink createCopy() {
68+
return new Kafka09JsonTableSink(topic, properties, partitioner);
69+
}
70+
71+
@Override
72+
public void emitDataStream(DataStream<Row> dataStream) {
73+
FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner);
74+
// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
75+
kafkaProducer.setFlushOnCheckpoint(true);
76+
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
77+
}
78+
}

kafka09/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,17 @@
1515

1616
<modules>
1717
<module>kafka09-source</module>
18+
<module>kafka09-sink</module>
1819
</modules>
1920

2021
<dependencies>
22+
23+
<dependency>
24+
<groupId>org.apache.flink</groupId>
25+
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
26+
<version>${flink.version}</version>
27+
</dependency>
28+
2129
<dependency>
2230
<groupId>junit</groupId>
2331
<artifactId>junit</artifactId>
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.sink.kafka;
19+
20+
import org.apache.flink.api.common.serialization.SerializationSchema;
21+
import org.apache.flink.streaming.api.datastream.DataStream;
22+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
23+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
24+
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSink;
25+
import org.apache.flink.streaming.connectors.kafka.KafkaJsonTableSink;
26+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
27+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
28+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
29+
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
30+
import org.apache.flink.table.util.TableConnectorUtil;
31+
import org.apache.flink.types.Row;
32+
33+
import java.util.Properties;
34+
35+
/**
36+
* Reason: add schema info
37+
* Date: 2019/4/8
38+
* Company: www.dtstack.com
39+
*
40+
* @author maqi
41+
*/
42+
public class CustomerKafka10JsonTableSink extends KafkaJsonTableSink {
43+
44+
45+
protected SerializationSchema schema;
46+
47+
public CustomerKafka10JsonTableSink(String topic, Properties properties, SerializationSchema schema) {
48+
super(topic, properties, new FlinkFixedPartitioner<>());
49+
this.schema = schema;
50+
}
51+
52+
public CustomerKafka10JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, SerializationSchema schema) {
53+
super(topic, properties, partitioner);
54+
this.schema = schema;
55+
}
56+
57+
58+
@Deprecated
59+
public CustomerKafka10JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, SerializationSchema schema) {
60+
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
61+
this.schema = schema;
62+
}
63+
64+
@Override
65+
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
66+
return new FlinkKafkaProducer010<Row>(topic, serializationSchema, properties, partitioner);
67+
}
68+
69+
@Override
70+
protected Kafka09JsonTableSink createCopy() {
71+
return new Kafka09JsonTableSink(topic, properties, partitioner);
72+
}
73+
74+
@Override
75+
public void emitDataStream(DataStream<Row> dataStream) {
76+
FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner);
77+
// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
78+
kafkaProducer.setFlushOnCheckpoint(true);
79+
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
80+
}
81+
}

kafka10/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,15 @@
1414

1515
<modules>
1616
<module>kafka10-source</module>
17+
<module>kafka10-sink</module>
1718
</modules>
1819

1920
<dependencies>
21+
<dependency>
22+
<groupId>org.apache.flink</groupId>
23+
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
24+
<version>${flink.version}</version>
25+
</dependency>
2026
<dependency>
2127
<groupId>junit</groupId>
2228
<artifactId>junit</artifactId>
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.sink.kafka;
19+
20+
import org.apache.flink.api.common.serialization.SerializationSchema;
21+
import org.apache.flink.streaming.api.datastream.DataStream;
22+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
23+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
24+
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSink;
25+
import org.apache.flink.streaming.connectors.kafka.KafkaJsonTableSink;
26+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
27+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
28+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
29+
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
30+
import org.apache.flink.table.util.TableConnectorUtil;
31+
import org.apache.flink.types.Row;
32+
33+
import java.util.Properties;
34+
35+
/**
36+
* Reason: add schema info
37+
* Date: 2019/4/8
38+
* Company: www.dtstack.com
39+
*
40+
* @author maqi
41+
*/
42+
public class CustomerKafka11JsonTableSink extends KafkaJsonTableSink {
43+
44+
45+
protected SerializationSchema schema;
46+
47+
public CustomerKafka11JsonTableSink(String topic, Properties properties, SerializationSchema schema) {
48+
super(topic, properties, new FlinkFixedPartitioner<>());
49+
this.schema = schema;
50+
}
51+
52+
public CustomerKafka11JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, SerializationSchema schema) {
53+
super(topic, properties, partitioner);
54+
this.schema = schema;
55+
}
56+
57+
58+
@Deprecated
59+
public CustomerKafka11JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, SerializationSchema schema) {
60+
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
61+
this.schema = schema;
62+
}
63+
64+
@Override
65+
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
66+
return new FlinkKafkaProducer010<Row>(topic, serializationSchema, properties, partitioner);
67+
}
68+
69+
@Override
70+
protected Kafka09JsonTableSink createCopy() {
71+
return new Kafka09JsonTableSink(topic, properties, partitioner);
72+
}
73+
74+
@Override
75+
public void emitDataStream(DataStream<Row> dataStream) {
76+
FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner);
77+
// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
78+
kafkaProducer.setFlushOnCheckpoint(true);
79+
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
80+
}
81+
}

kafka11/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,16 @@
1414

1515
<modules>
1616
<module>kafka11-source</module>
17+
<module>kafka11-sink</module>
1718
</modules>
1819

1920
<dependencies>
21+
<dependency>
22+
<groupId>org.apache.flink</groupId>
23+
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
24+
<version>${flink.version}</version>
25+
</dependency>
26+
2027
<dependency>
2128
<groupId>junit</groupId>
2229
<artifactId>junit</artifactId>

0 commit comments

Comments
 (0)