Skip to content

Commit ef2f2d2

Browse files
committed
add kafkaSink
1 parent fe961ec commit ef2f2d2

12 files changed

Lines changed: 978 additions & 0 deletions

File tree

kafka09/kafka09-sink/pom.xml

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.kafka09</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.sink.kafka09</artifactId>
14+
<packaging>jar</packaging>
15+
16+
<name>kafka09-sink</name>
17+
<url>http://maven.apache.org</url>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.flink</groupId>
22+
<artifactId>flink-json</artifactId>
23+
<version>${flink.version}</version>
24+
</dependency>
25+
</dependencies>
26+
27+
<build>
28+
<plugins>
29+
<plugin>
30+
<groupId>org.apache.maven.plugins</groupId>
31+
<artifactId>maven-shade-plugin</artifactId>
32+
<version>1.4</version>
33+
<executions>
34+
<execution>
35+
<phase>package</phase>
36+
<goals>
37+
<goal>shade</goal>
38+
</goals>
39+
<configuration>
40+
<artifactSet>
41+
<excludes>
42+
43+
</excludes>
44+
</artifactSet>
45+
<filters>
46+
<filter>
47+
<artifact>*:*</artifact>
48+
<excludes>
49+
<exclude>META-INF/*.SF</exclude>
50+
<exclude>META-INF/*.DSA</exclude>
51+
<exclude>META-INF/*.RSA</exclude>
52+
</excludes>
53+
</filter>
54+
</filters>
55+
</configuration>
56+
</execution>
57+
</executions>
58+
</plugin>
59+
60+
<plugin>
61+
<artifactId>maven-antrun-plugin</artifactId>
62+
<version>1.2</version>
63+
<executions>
64+
<execution>
65+
<id>copy-resources</id>
66+
<!-- here the phase you need -->
67+
<phase>package</phase>
68+
<goals>
69+
<goal>run</goal>
70+
</goals>
71+
<configuration>
72+
<tasks>
73+
<copy todir="${basedir}/../../plugins/kafka09sink">
74+
<fileset dir="target/">
75+
<include name="${project.artifactId}-${project.version}.jar" />
76+
</fileset>
77+
</copy>
78+
79+
<move file="${basedir}/../../plugins/kafka09sink/${project.artifactId}-${project.version}.jar"
80+
tofile="${basedir}/../../plugins/kafka09sink/${project.name}.jar" />
81+
</tasks>
82+
</configuration>
83+
</execution>
84+
</executions>
85+
</plugin>
86+
</plugins>
87+
</build>
88+
89+
90+
</project>
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka;
20+
21+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
22+
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
23+
import com.dtstack.flink.sql.table.TargetTableInfo;
24+
import org.apache.flink.api.common.serialization.SerializationSchema;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.formats.json.JsonRowSerializationSchema;
28+
import org.apache.flink.streaming.api.datastream.DataStream;
29+
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
30+
import org.apache.flink.table.sinks.AppendStreamTableSink;
31+
import org.apache.flink.table.sinks.TableSink;
32+
import org.apache.flink.types.Row;
33+
import java.util.Properties;
34+
35+
/**
36+
* Date: 2018/12/18
37+
* Company: www.dtstack.com
38+
*
39+
* @author DocLi
40+
* @modifyer maqi
41+
*/
42+
public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
43+
44+
protected String[] fieldNames;
45+
46+
protected TypeInformation<?>[] fieldTypes;
47+
48+
protected String topic;
49+
50+
protected Properties properties;
51+
52+
/** Serialization schema for encoding records to Kafka. */
53+
protected SerializationSchema serializationSchema;
54+
55+
@Override
56+
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
57+
KafkaSinkTableInfo kafka09SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
58+
this.topic = kafka09SinkTableInfo.getTopic();
59+
this.fieldNames = kafka09SinkTableInfo.getFields();
60+
TypeInformation[] types = new TypeInformation[kafka09SinkTableInfo.getFields().length];
61+
for (int i = 0; i < kafka09SinkTableInfo.getFieldClasses().length; i++) {
62+
types[i] = TypeInformation.of(kafka09SinkTableInfo.getFieldClasses()[i]);
63+
}
64+
this.fieldTypes = types;
65+
66+
properties = new Properties();
67+
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
68+
69+
this.serializationSchema = new JsonRowSerializationSchema(getOutputType());
70+
return this;
71+
}
72+
73+
@Override
74+
public void emitDataStream(DataStream<Row> dataStream) {
75+
KafkaTableSink kafkaTableSink = new CustomerKafka09JsonTableSink(
76+
topic,
77+
properties,
78+
serializationSchema
79+
);
80+
81+
kafkaTableSink.emitDataStream(dataStream);
82+
}
83+
84+
@Override
85+
public TypeInformation<Row> getOutputType() {
86+
return new RowTypeInfo(fieldTypes, fieldNames);
87+
}
88+
89+
@Override
90+
public String[] getFieldNames() {
91+
return fieldNames;
92+
}
93+
94+
@Override
95+
public TypeInformation<?>[] getFieldTypes() {
96+
return fieldTypes;
97+
}
98+
99+
@Override
100+
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
101+
this.fieldNames = fieldNames;
102+
this.fieldTypes = fieldTypes;
103+
return this;
104+
}
105+
106+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka.table;
20+
21+
import com.dtstack.flink.sql.table.AbsTableParser;
22+
import com.dtstack.flink.sql.table.TableInfo;
23+
import com.dtstack.flink.sql.util.MathUtil;
24+
25+
import java.util.Map;
26+
27+
/**
28+
* Date: 2018/12/18
29+
* Company: www.dtstack.com
30+
* @author DocLi
31+
*
32+
* @modifyer maqi
33+
*
34+
*/
35+
public class KafkaSinkParser extends AbsTableParser {
36+
@Override
37+
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
38+
KafkaSinkTableInfo kafka09SinkTableInfo = new KafkaSinkTableInfo();
39+
kafka09SinkTableInfo.setName(tableName);
40+
parseFieldsInfo(fieldsInfo, kafka09SinkTableInfo);
41+
42+
43+
kafka09SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
44+
kafka09SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45+
kafka09SinkTableInfo.check();
46+
return kafka09SinkTableInfo;
47+
}
48+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka.table;
20+
21+
import com.dtstack.flink.sql.table.TargetTableInfo;
22+
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
/**
29+
* Date: 2018/12/18
30+
* Company: www.dtstack.com
31+
*
32+
* @author DocLi
33+
* @modifyer maqi
34+
*/
35+
public class KafkaSinkTableInfo extends TargetTableInfo {
36+
//version
37+
private static final String CURR_TYPE = "kafka09";
38+
39+
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
40+
41+
public static final String TOPIC_KEY = "topic";
42+
43+
private String bootstrapServers;
44+
45+
private String topic;
46+
47+
public KafkaSinkTableInfo() {
48+
super.setType(CURR_TYPE);
49+
}
50+
51+
52+
public String getBootstrapServers() {
53+
return bootstrapServers;
54+
}
55+
56+
public void setBootstrapServers(String bootstrapServers) {
57+
this.bootstrapServers = bootstrapServers;
58+
}
59+
60+
public String getTopic() {
61+
return topic;
62+
}
63+
64+
public void setTopic(String topic) {
65+
this.topic = topic;
66+
}
67+
68+
@Override
69+
public boolean check() {
70+
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
71+
Preconditions.checkNotNull(topic, "kafka of topic is required");
72+
return false;
73+
}
74+
75+
@Override
76+
public String getType() {
77+
// return super.getType() + SOURCE_SUFFIX;
78+
return super.getType();
79+
}
80+
}

0 commit comments

Comments
 (0)