Skip to content

Commit a425993

Browse files
author
yanxi
authored
Merge pull request #47 from zhihui-ge/master
update kafka source
2 parents 35f8567 + 277bedc commit a425993

8 files changed

Lines changed: 77 additions & 26 deletions

File tree

README.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
> > * 自定义create view 语法
55
> > * 自定义create function 语法
66
> > * 实现了流与维表的join
7+
> > * 支持原生FLinkSQL所有的语法
78
89
# 已支持
910
* 源表:kafka 0.9,1.x版本
@@ -14,6 +15,8 @@
1415
* 增加oracle维表,结果表功能
1516
* 增加SQlServer维表,结果表功能
1617
* 增加kafka结果表功能
18+
* 增加SQL支持CEP
19+
* 维表快照
1720

1821
## 1 快速起步
1922
### 1.1 运行模式
@@ -26,7 +29,7 @@
2629
### 1.2 执行环境
2730

2831
* Java: JDK8及以上
29-
* Flink集群: 1.4(单机模式不需要安装Flink集群)
32+
* Flink集群: 1.4,1.5(单机模式不需要安装Flink集群)
3033
* 操作系统:理论上不限
3134

3235
### 1.3 打包
@@ -44,12 +47,12 @@ mvn clean package -Dmaven.test.skip
4447
#### 1.4.1 启动命令
4548

4649
```
47-
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp {\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000}
50+
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\}
4851
```
4952

5053
#### 1.4.2 命令行参数选项
5154

52-
* **model**
55+
* **mode**
5356
* 描述:执行模式,也就是flink集群的工作模式
5457
* local: 本地模式
5558
* standalone: 提交到独立部署模式的flink集群
@@ -80,6 +83,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
8083

8184
* **addjar**
8285
* 描述:扩展jar路径,当前主要是UDF定义的jar;
86+
* 格式:json
8387
* 必选:否
8488
* 默认值:无
8589

@@ -180,12 +184,16 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
180184
## 4 样例
181185

182186
```
187+
188+
CREATE (scala|table) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun
189+
190+
183191
CREATE TABLE MyTable(
184192
name varchar,
185193
channel varchar,
186194
pv int,
187195
xctime bigint,
188-
CHARACTER_LENGTH(channel) AS timeLeng
196+
CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数
189197
)WITH(
190198
type ='kafka09',
191199
bootstrapServers ='172.16.8.198:9092',
@@ -223,7 +231,7 @@ CREATE TABLE sideTable(
223231
cf:name varchar as name,
224232
cf:info varchar as info,
225233
PRIMARY KEY(name),
226-
PERIOD FOR SYSTEM_TIME
234+
PERIOD FOR SYSTEM_TIME //维表标识
227235
)WITH(
228236
type ='hbase',
229237
zookeeperQuorum ='rdos1:2181',

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ public class Main {
9191

9292
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
9393

94-
private static final String LOCAL_MODE = "local";
95-
9694
private static final int failureRate = 3;
9795

9896
private static final int failureInterval = 6; //min
@@ -141,7 +139,7 @@ public static void main(String[] args) throws Exception {
141139
Thread.currentThread().setContextClassLoader(dtClassLoader);
142140

143141
URLClassLoader parentClassloader;
144-
if(!LOCAL_MODE.equals(deployMode)){
142+
if(!ClusterMode.local.name().equals(deployMode)){
145143
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
146144
}else{
147145
parentClassloader = dtClassLoader;
@@ -313,7 +311,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
313311
}
314312

315313
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {
316-
StreamExecutionEnvironment env = !LOCAL_MODE.equals(deployMode) ?
314+
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
317315
StreamExecutionEnvironment.getExecutionEnvironment() :
318316
new MyLocalStreamEnvironment();
319317

docs/kafkaSource.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ CREATE TABLE tableName(
3838
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
3939
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
4040
|topic | 需要读取的 topic 名称|||
41-
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest]||latest|
41+
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest\|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4242
|parallelism | 并行度设置||1|
4343

4444
## 5.样例:

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,20 @@
2323
import com.dtstack.flink.sql.source.IStreamSourceGener;
2424
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
26+
import com.dtstack.flink.sql.util.PluginUtil;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.common.functions.RuntimeContext;
2829
import org.apache.flink.api.common.typeinfo.TypeInformation;
2930
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3031
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3132
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
33+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3234
import org.apache.flink.table.api.Table;
3335
import org.apache.flink.table.api.java.StreamTableEnvironment;
3436
import org.apache.flink.types.Row;
3537

38+
import java.util.HashMap;
39+
import java.util.Map;
3640
import java.util.Properties;
3741

3842
/**
@@ -76,7 +80,20 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7680
//earliest,latest
7781
if("earliest".equalsIgnoreCase(kafka09SourceTableInfo.getOffsetReset())){
7882
kafkaSrc.setStartFromEarliest();
79-
}else{
83+
}else if(kafka09SourceTableInfo.getOffsetReset().startsWith("{")){
84+
try {
85+
// {"0":12312,"1":12321,"2":12312}
86+
Properties properties = PluginUtil.jsonStrToObject(kafka09SourceTableInfo.getOffsetReset(), Properties.class);
87+
Map<String, Object> offsetMap = PluginUtil.ObjectToMap(properties);
88+
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
89+
for(Map.Entry<String,Object> entry:offsetMap.entrySet()){
90+
specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString()));
91+
}
92+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
93+
} catch (Exception e) {
94+
throw new RuntimeException("not support offsetReset type:" + kafka09SourceTableInfo.getOffsetReset());
95+
}
96+
}else {
8097
kafkaSrc.setStartFromLatest();
8198
}
8299

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@
2323
import com.dtstack.flink.sql.source.IStreamSourceGener;
2424
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
26+
import com.dtstack.flink.sql.util.PluginUtil;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.common.typeinfo.TypeInformation;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3031
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
32+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3133
import org.apache.flink.table.api.Table;
3234
import org.apache.flink.table.api.java.StreamTableEnvironment;
3335
import org.apache.flink.types.Row;
3436

37+
import java.util.HashMap;
38+
import java.util.Map;
3539
import java.util.Properties;
3640

3741
/**
@@ -75,7 +79,20 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7579
//earliest,latest
7680
if("earliest".equalsIgnoreCase(kafka010SourceTableInfo.getOffsetReset())){
7781
kafkaSrc.setStartFromEarliest();
78-
}else{
82+
}else if(kafka010SourceTableInfo.getOffsetReset().startsWith("{")){
83+
try {
84+
// {"0":12312,"1":12321,"2":12312}
85+
Properties properties = PluginUtil.jsonStrToObject(kafka010SourceTableInfo.getOffsetReset(), Properties.class);
86+
Map<String, Object> offsetMap = PluginUtil.ObjectToMap(properties);
87+
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
88+
for(Map.Entry<String,Object> entry:offsetMap.entrySet()){
89+
specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString()));
90+
}
91+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
92+
} catch (Exception e) {
93+
throw new RuntimeException("not support offsetReset type:" + kafka010SourceTableInfo.getOffsetReset());
94+
}
95+
}else {
7996
kafkaSrc.setStartFromLatest();
8097
}
8198

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@
2323
import com.dtstack.flink.sql.source.IStreamSourceGener;
2424
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
26+
import com.dtstack.flink.sql.util.PluginUtil;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.common.typeinfo.TypeInformation;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3031
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
32+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3133
import org.apache.flink.table.api.Table;
3234
import org.apache.flink.table.api.java.StreamTableEnvironment;
3335
import org.apache.flink.types.Row;
3436

37+
import java.util.HashMap;
38+
import java.util.Map;
3539
import java.util.Properties;
3640

3741
/**
@@ -75,6 +79,19 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7579
//earliest,latest
7680
if("earliest".equalsIgnoreCase(kafka011SourceTableInfo.getOffsetReset())){
7781
kafkaSrc.setStartFromEarliest();
82+
}else if(kafka011SourceTableInfo.getOffsetReset().startsWith("{")){
83+
try {
84+
// {"0":12312,"1":12321,"2":12312}
85+
Properties properties = PluginUtil.jsonStrToObject(kafka011SourceTableInfo.getOffsetReset(), Properties.class);
86+
Map<String, Object> offsetMap = PluginUtil.ObjectToMap(properties);
87+
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
88+
for(Map.Entry<String,Object> entry:offsetMap.entrySet()){
89+
specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString()));
90+
}
91+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
92+
} catch (Exception e) {
93+
throw new RuntimeException("not support offsetReset type:" + kafka011SourceTableInfo.getOffsetReset());
94+
}
7895
}else{
7996
kafkaSrc.setStartFromLatest();
8097
}

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public MysqlAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
7171
super(new MysqlAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7272
}
7373

74-
7574
@Override
7675
protected Row fillData(Row input, Object sideInput) {
7776
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
144144
List<String> value = async.keys(key + ":*").get();
145145
String[] values = value.toArray(new String[value.size()]);
146146
RedisFuture<List<KeyValue<String, String>>> future = async.mget(values);
147-
while (future.isDone()){
148-
try {
149-
List<KeyValue<String, String>> kvList = future.get();
150-
if (kvList.size() != 0){
151-
for (int i=0; i<kvList.size(); i++){
152-
String[] splitKeys = kvList.get(i).getKey().split(":");
147+
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
148+
@Override
149+
public void accept(List<KeyValue<String, String>> keyValues) {
150+
if (keyValues.size() != 0){
151+
for (int i=0; i<keyValues.size(); i++){
152+
String[] splitKeys = keyValues.get(i).getKey().split(":");
153153
keyValue.put(splitKeys[1], splitKeys[2]);
154-
keyValue.put(splitKeys[3], kvList.get(i).getValue());
154+
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
155155
}
156156
Row row = fillData(input, keyValue);
157157
resultFuture.complete(Collections.singleton(row));
@@ -164,13 +164,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
164164
putCache(key, CacheMissVal.getMissKeyObj());
165165
}
166166
}
167-
} catch (InterruptedException e1) {
168-
e1.printStackTrace();
169-
} catch (ExecutionException e1) {
170-
e1.printStackTrace();
171167
}
172-
}
173-
168+
});
174169
}
175170

176171
private String buildCacheKey(List<String> keyData) {

0 commit comments

Comments
 (0)