Skip to content

Commit 873884e

Browse files
committed
Merge branch 'v1.5.0_dev_add_kafkasink' into 'v1.5.0_dev'
V1.5.0 dev add kafkasink See merge request !8
2 parents 6144eae + ef2f2d2 commit 873884e

19 files changed

Lines changed: 1243 additions & 3 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/sink/StreamSinkFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir)
5151
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
5252

5353
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir);
54-
5554
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
56-
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
55+
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
56+
String className = PluginUtil.getSqlParserClassName(typeNoVersion, CURR_TYPE);
5757
Class<?> targetParser = dtClassLoader.loadClass(className);
5858

5959
if(!AbsTableParser.class.isAssignableFrom(targetParser)){
@@ -77,7 +77,8 @@ public static TableSink getTableSink(TargetTableInfo targetTableInfo, String loc
7777

7878
PluginUtil.addPluginJar(pluginJarDirPath, dtClassLoader);
7979

80-
String className = PluginUtil.getGenerClassName(pluginType, CURR_TYPE);
80+
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
81+
String className = PluginUtil.getGenerClassName(typeNoVersion, CURR_TYPE);
8182
Class<?> sinkClass = dtClassLoader.loadClass(className);
8283

8384
if(!IStreamSinkGener.class.isAssignableFrom(sinkClass)){

kafka09/kafka09-sink/pom.xml

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.kafka09</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.sink.kafka09</artifactId>
14+
<packaging>jar</packaging>
15+
16+
<name>kafka09-sink</name>
17+
<url>http://maven.apache.org</url>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.flink</groupId>
22+
<artifactId>flink-json</artifactId>
23+
<version>${flink.version}</version>
24+
</dependency>
25+
</dependencies>
26+
27+
<build>
28+
<plugins>
29+
<plugin>
30+
<groupId>org.apache.maven.plugins</groupId>
31+
<artifactId>maven-shade-plugin</artifactId>
32+
<version>1.4</version>
33+
<executions>
34+
<execution>
35+
<phase>package</phase>
36+
<goals>
37+
<goal>shade</goal>
38+
</goals>
39+
<configuration>
40+
<artifactSet>
41+
<excludes>
42+
43+
</excludes>
44+
</artifactSet>
45+
<filters>
46+
<filter>
47+
<artifact>*:*</artifact>
48+
<excludes>
49+
<exclude>META-INF/*.SF</exclude>
50+
<exclude>META-INF/*.DSA</exclude>
51+
<exclude>META-INF/*.RSA</exclude>
52+
</excludes>
53+
</filter>
54+
</filters>
55+
</configuration>
56+
</execution>
57+
</executions>
58+
</plugin>
59+
60+
<plugin>
61+
<artifactId>maven-antrun-plugin</artifactId>
62+
<version>1.2</version>
63+
<executions>
64+
<execution>
65+
<id>copy-resources</id>
66+
<!-- here the phase you need -->
67+
<phase>package</phase>
68+
<goals>
69+
<goal>run</goal>
70+
</goals>
71+
<configuration>
72+
<tasks>
73+
<copy todir="${basedir}/../../plugins/kafka09sink">
74+
<fileset dir="target/">
75+
<include name="${project.artifactId}-${project.version}.jar" />
76+
</fileset>
77+
</copy>
78+
79+
<move file="${basedir}/../../plugins/kafka09sink/${project.artifactId}-${project.version}.jar"
80+
tofile="${basedir}/../../plugins/kafka09sink/${project.name}.jar" />
81+
</tasks>
82+
</configuration>
83+
</execution>
84+
</executions>
85+
</plugin>
86+
</plugins>
87+
</build>
88+
89+
90+
</project>
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.sink.kafka;
19+
20+
import org.apache.flink.api.common.serialization.SerializationSchema;
21+
import org.apache.flink.streaming.api.datastream.DataStream;
22+
import org.apache.flink.streaming.connectors.kafka.*;
23+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
24+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
25+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
26+
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
27+
import org.apache.flink.table.util.TableConnectorUtil;
28+
import org.apache.flink.types.Row;
29+
30+
import java.util.Properties;
31+
32+
/**
33+
* Reason: add schema info
34+
* Date: 2019/4/8
35+
* Company: www.dtstack.com
36+
*
37+
* @author maqi
38+
*/
39+
public class CustomerKafka09JsonTableSink extends KafkaJsonTableSink {
40+
41+
42+
protected SerializationSchema schema;
43+
44+
public CustomerKafka09JsonTableSink(String topic, Properties properties, SerializationSchema schema) {
45+
super(topic, properties, new FlinkFixedPartitioner<>());
46+
this.schema = schema;
47+
}
48+
49+
public CustomerKafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, SerializationSchema schema) {
50+
super(topic, properties, partitioner);
51+
this.schema = schema;
52+
}
53+
54+
55+
@Deprecated
56+
public CustomerKafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, SerializationSchema schema) {
57+
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
58+
this.schema = schema;
59+
}
60+
61+
@Override
62+
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
63+
return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
64+
}
65+
66+
@Override
67+
protected Kafka09JsonTableSink createCopy() {
68+
return new Kafka09JsonTableSink(topic, properties, partitioner);
69+
}
70+
71+
@Override
72+
public void emitDataStream(DataStream<Row> dataStream) {
73+
FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner);
74+
// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
75+
kafkaProducer.setFlushOnCheckpoint(true);
76+
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
77+
}
78+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka;
20+
21+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
22+
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
23+
import com.dtstack.flink.sql.table.TargetTableInfo;
24+
import org.apache.flink.api.common.serialization.SerializationSchema;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.formats.json.JsonRowSerializationSchema;
28+
import org.apache.flink.streaming.api.datastream.DataStream;
29+
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
30+
import org.apache.flink.table.sinks.AppendStreamTableSink;
31+
import org.apache.flink.table.sinks.TableSink;
32+
import org.apache.flink.types.Row;
33+
import java.util.Properties;
34+
35+
/**
36+
* Date: 2018/12/18
37+
* Company: www.dtstack.com
38+
*
39+
* @author DocLi
40+
* @modifyer maqi
41+
*/
42+
public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
43+
44+
protected String[] fieldNames;
45+
46+
protected TypeInformation<?>[] fieldTypes;
47+
48+
protected String topic;
49+
50+
protected Properties properties;
51+
52+
/** Serialization schema for encoding records to Kafka. */
53+
protected SerializationSchema serializationSchema;
54+
55+
@Override
56+
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
57+
KafkaSinkTableInfo kafka09SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
58+
this.topic = kafka09SinkTableInfo.getTopic();
59+
this.fieldNames = kafka09SinkTableInfo.getFields();
60+
TypeInformation[] types = new TypeInformation[kafka09SinkTableInfo.getFields().length];
61+
for (int i = 0; i < kafka09SinkTableInfo.getFieldClasses().length; i++) {
62+
types[i] = TypeInformation.of(kafka09SinkTableInfo.getFieldClasses()[i]);
63+
}
64+
this.fieldTypes = types;
65+
66+
properties = new Properties();
67+
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
68+
69+
this.serializationSchema = new JsonRowSerializationSchema(getOutputType());
70+
return this;
71+
}
72+
73+
@Override
74+
public void emitDataStream(DataStream<Row> dataStream) {
75+
KafkaTableSink kafkaTableSink = new CustomerKafka09JsonTableSink(
76+
topic,
77+
properties,
78+
serializationSchema
79+
);
80+
81+
kafkaTableSink.emitDataStream(dataStream);
82+
}
83+
84+
@Override
85+
public TypeInformation<Row> getOutputType() {
86+
return new RowTypeInfo(fieldTypes, fieldNames);
87+
}
88+
89+
@Override
90+
public String[] getFieldNames() {
91+
return fieldNames;
92+
}
93+
94+
@Override
95+
public TypeInformation<?>[] getFieldTypes() {
96+
return fieldTypes;
97+
}
98+
99+
@Override
100+
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
101+
this.fieldNames = fieldNames;
102+
this.fieldTypes = fieldTypes;
103+
return this;
104+
}
105+
106+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka.table;
20+
21+
import com.dtstack.flink.sql.table.AbsTableParser;
22+
import com.dtstack.flink.sql.table.TableInfo;
23+
import com.dtstack.flink.sql.util.MathUtil;
24+
25+
import java.util.Map;
26+
27+
/**
28+
* Date: 2018/12/18
29+
* Company: www.dtstack.com
30+
* @author DocLi
31+
*
32+
* @modifyer maqi
33+
*
34+
*/
35+
public class KafkaSinkParser extends AbsTableParser {
36+
@Override
37+
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
38+
KafkaSinkTableInfo kafka09SinkTableInfo = new KafkaSinkTableInfo();
39+
kafka09SinkTableInfo.setName(tableName);
40+
parseFieldsInfo(fieldsInfo, kafka09SinkTableInfo);
41+
42+
43+
kafka09SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
44+
kafka09SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
kafka09SinkTableInfo.check();
46+
return kafka09SinkTableInfo;
47+
}
48+
}

0 commit comments

Comments
 (0)