Skip to content

Commit d5cd251

Browse files
author
sishu@dtstack.com
committed
Merge branch 'v1.5.0_dev' of https://github.com/DTStack/flinkStreamSQL into v1.5.0_dev
ysq
2 parents 0af546d + a425993 commit d5cd251

18 files changed

Lines changed: 209 additions & 73 deletions

File tree

README.md

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
> > * 自定义create view 语法
55
> > * 自定义create function 语法
66
> > * 实现了流与维表的join
7+
> > * 支持原生FLinkSQL所有的语法
78
89
# 已支持
910
* 源表:kafka 0.9,1.x版本
@@ -14,6 +15,8 @@
1415
* 增加oracle维表,结果表功能
1516
* 增加SQlServer维表,结果表功能
1617
* 增加kafka结果表功能
18+
* 增加SQL支持CEP
19+
* 维表快照
1720

1821
## 1 快速起步
1922
### 1.1 运行模式
@@ -26,7 +29,7 @@
2629
### 1.2 执行环境
2730

2831
* Java: JDK8及以上
29-
* Flink集群: 1.4(单机模式不需要安装Flink集群)
32+
* Flink集群: 1.4,1.5(单机模式不需要安装Flink集群)
3033
* 操作系统:理论上不限
3134

3235
### 1.3 打包
@@ -44,16 +47,17 @@ mvn clean package -Dmaven.test.skip
4447
#### 1.4.1 启动命令
4548

4649
```
47-
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp {\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000}
50+
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\}
4851
```
4952

5053
#### 1.4.2 命令行参数选项
5154

52-
* **model**
55+
* **mode**
5356
* 描述:执行模式,也就是flink集群的工作模式
5457
* local: 本地模式
55-
* standalone: 独立部署模式的flink集群
56-
* yarn: yarn模式的flink集群
58+
* standalone: 提交到独立部署模式的flink集群
59+
* yarn: 提交到yarn模式的flink集群(即提交到已有flink集群)
60+
* yarnPer: yarn per_job模式提交(即创建新flink application)
5761
* 必选:否
5862
* 默认值:local
5963

@@ -79,6 +83,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
7983

8084
* **addjar**
8185
* 描述:扩展jar路径,当前主要是UDF定义的jar;
86+
* 格式:json
8287
* 必选:否
8388
* 默认值:无
8489

@@ -97,6 +102,11 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
97102
* sql.max.concurrent.checkpoints: 最大并发生成checkpoint数
98103
* sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)]
99104
* flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file://
105+
* jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768)
106+
* taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768)
107+
* taskmanager.num: per_job模式下指定taskmanager的实例数(默认1)
108+
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
109+
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
100110

101111

102112
* **flinkconf**
@@ -118,6 +128,16 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
118128
* 描述:指示保存点是否允许非还原状态的标志
119129
* 必选:否
120130
* 默认值:false
131+
132+
* **flinkJarPath**
133+
* 描述:per_job 模式提交需要指定本地的flink jar存放路径
134+
* 必选:否
135+
* 默认值:false
136+
137+
* **queue**
138+
* 描述:per_job 模式下指定的yarn queue
139+
* 必选:否
140+
* 默认值:false
121141

122142
## 2 结构
123143
### 2.1 源表插件
@@ -135,16 +155,45 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
135155
* [mysql 维表插件](docs/mysqlSide.md)
136156
* [mongo 维表插件](docs/mongoSide.md)
137157
* [redis 维表插件](docs/redisSide.md)
158+
159+
## 3 性能指标(新增)
160+
161+
### kafka插件
162+
* 业务延迟: flink_taskmanager_job_task_operator_dtEventDelay(单位s)
163+
数据本身的时间和进入flink的当前时间的差值.
164+
165+
* 各个输入源的脏数据:flink_taskmanager_job_task_operator_dtDirtyData
166+
从kafka获取的数据解析失败的视为脏数据
167+
168+
* 各Source的数据输入TPS: flink_taskmanager_job_task_operator_dtNumRecordsInRate
169+
kafka接受的记录数(未解析前)/s
170+
171+
* 各Source的数据输入RPS: flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate
172+
kafka接受的记录数(解析后)/s
173+
174+
* 各Source的数据输入BPS: flink_taskmanager_job_task_operator_dtNumBytesInRate
175+
kafka接受的字节数/s
176+
177+
* Kafka作为输入源的各个分区的延迟数: flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag
178+
当前kafka10,kafka11有采集该指标
179+
180+
* 各个输出源RPS: flink_taskmanager_job_task_operator_dtNumRecordsOutRate
181+
写入的外部记录数/s
182+
138183

139-
## 3 样例
184+
## 4 样例
140185

141186
```
187+
188+
CREATE (scala|table) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun
189+
190+
142191
CREATE TABLE MyTable(
143192
name varchar,
144193
channel varchar,
145194
pv int,
146195
xctime bigint,
147-
CHARACTER_LENGTH(channel) AS timeLeng
196+
CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数
148197
)WITH(
149198
type ='kafka09',
150199
bootstrapServers ='172.16.8.198:9092',
@@ -182,7 +231,7 @@ CREATE TABLE sideTable(
182231
cf:name varchar as name,
183232
cf:info varchar as info,
184233
PRIMARY KEY(name),
185-
PERIOD FOR SYSTEM_TIME
234+
PERIOD FOR SYSTEM_TIME //维表标识
186235
)WITH(
187236
type ='hbase',
188237
zookeeperQuorum ='rdos1:2181',

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ public class Main {
9191

9292
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
9393

94-
private static final String LOCAL_MODE = "local";
95-
9694
private static final int failureRate = 3;
9795

9896
private static final int failureInterval = 6; //min
@@ -141,7 +139,7 @@ public static void main(String[] args) throws Exception {
141139
Thread.currentThread().setContextClassLoader(dtClassLoader);
142140

143141
URLClassLoader parentClassloader;
144-
if(!LOCAL_MODE.equals(deployMode)){
142+
if(!ClusterMode.local.name().equals(deployMode)){
145143
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
146144
}else{
147145
parentClassloader = dtClassLoader;
@@ -313,7 +311,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
313311
}
314312

315313
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {
316-
StreamExecutionEnvironment env = !LOCAL_MODE.equals(deployMode) ?
314+
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
317315
StreamExecutionEnvironment.getExecutionEnvironment() :
318316
new MyLocalStreamEnvironment();
319317

docs/kafkaSource.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ CREATE TABLE tableName(
3838
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
3939
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
4040
|topic | 需要读取的 topic 名称|||
41-
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest]||latest|
41+
|offsetReset | 读取的topic 的offset初始位置[latest\|earliest\|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4242
|parallelism | 并行度设置||1|
4343

4444
## 5.样例:

docs/prometheus.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
## 使用 prometheus pushgateway 需要设置的 confProp 参数
2+
* metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
3+
* metrics.reporter.promgateway.host: prometheus pushgateway的地址
4+
* metrics.reporter.promgateway.port:prometheus pushgateway的端口
5+
* metrics.reporter.promgateway.jobName: 实例名称
6+
* metrics.reporter.promgateway.randomJobNameSuffix: 是否在实例名称后面添加随机字符串(默认:true)
7+
* metrics.reporter.promgateway.deleteOnShutdown: 是否在停止的时候删除数据(默认false)

hbase/hbase-side/hbase-all-side/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
<artifactId>log4j</artifactId>
3030
<groupId>log4j</groupId>
3131
</exclusion>
32+
<exclusion>
33+
<groupId>io.netty</groupId>
34+
<artifactId>netty</artifactId>
35+
</exclusion>
3236
</exclusions>
3337
</dependency>
3438
</dependencies>
@@ -51,7 +55,7 @@
5155
<exclude>org.apache.hadoop:hadoop-common</exclude>
5256
<exclude>org.apache.hadoop:hadoop-auth</exclude>
5357
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
54-
<exclude>org.slf4j</exclude>
58+
<exclude>org.slf4j:*</exclude>
5559
</excludes>
5660
</artifactSet>
5761
<filters>

hbase/hbase-side/hbase-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<exclude>org.apache.hadoop:hadoop-common</exclude>
5353
<exclude>org.apache.hadoop:hadoop-auth</exclude>
5454
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
55-
<exclude>org.slf4j</exclude>
55+
<exclude>org.slf4j:*</exclude>
5656
</excludes>
5757
</artifactSet>
5858
<filters>

hbase/hbase-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<configuration>
2828
<artifactSet>
2929
<excludes>
30-
<exclude>org.slf4j:slf4j-log4j12</exclude>
30+
<exclude>org.slf4j:*</exclude>
3131
<exclude>org.apache.hadoop:hadoop-common</exclude>
3232
<exclude>org.apache.hadoop:hadoop-auth</exclude>
3333
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.hadoop.hbase.client.ConnectionFactory;
3333
import org.apache.hadoop.hbase.client.Put;
3434
import org.apache.hadoop.hbase.client.Table;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3537

3638
import java.io.IOException;
3739
import java.text.SimpleDateFormat;
@@ -44,6 +46,8 @@
4446
*/
4547
public class HbaseOutputFormat extends RichOutputFormat<Tuple2> {
4648

49+
private static final Logger LOG = LoggerFactory.getLogger(HbaseOutputFormat.class);
50+
4751
private String host;
4852
private String zkParent;
4953
private String[] rowkey;
@@ -63,17 +67,21 @@ public class HbaseOutputFormat extends RichOutputFormat<Tuple2> {
6367

6468
@Override
6569
public void configure(Configuration parameters) {
70+
LOG.warn("---configure---");
6671
conf = HBaseConfiguration.create();
6772
conf.set("hbase.zookeeper.quorum", host);
6873
if(zkParent != null && !"".equals(zkParent)){
6974
conf.set("zookeeper.znode.parent", zkParent);
7075
}
76+
LOG.warn("---configure end ---");
7177
}
7278

7379
@Override
7480
public void open(int taskNumber, int numTasks) throws IOException {
81+
LOG.warn("---open---");
7582
conn = ConnectionFactory.createConnection(conf);
7683
table = conn.getTable(TableName.valueOf(tableName));
84+
LOG.warn("---open end(get table from hbase) ---");
7785
}
7886

7987
@Override

hbase/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@
4343
<artifactId>log4j</artifactId>
4444
<groupId>log4j</groupId>
4545
</exclusion>
46+
<exclusion>
47+
<groupId>io.netty</groupId>
48+
<artifactId>netty</artifactId>
49+
</exclusion>
4650
</exclusions>
4751
</dependency>
4852

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,20 @@
2323
import com.dtstack.flink.sql.source.IStreamSourceGener;
2424
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
2525
import com.dtstack.flink.sql.table.SourceTableInfo;
26+
import com.dtstack.flink.sql.util.PluginUtil;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.common.functions.RuntimeContext;
2829
import org.apache.flink.api.common.typeinfo.TypeInformation;
2930
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3031
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3132
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
33+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3234
import org.apache.flink.table.api.Table;
3335
import org.apache.flink.table.api.java.StreamTableEnvironment;
3436
import org.apache.flink.types.Row;
3537

38+
import java.util.HashMap;
39+
import java.util.Map;
3640
import java.util.Properties;
3741

3842
/**
@@ -76,7 +80,20 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
7680
//earliest,latest
7781
if("earliest".equalsIgnoreCase(kafka09SourceTableInfo.getOffsetReset())){
7882
kafkaSrc.setStartFromEarliest();
79-
}else{
83+
}else if(kafka09SourceTableInfo.getOffsetReset().startsWith("{")){
84+
try {
85+
// {"0":12312,"1":12321,"2":12312}
86+
Properties properties = PluginUtil.jsonStrToObject(kafka09SourceTableInfo.getOffsetReset(), Properties.class);
87+
Map<String, Object> offsetMap = PluginUtil.ObjectToMap(properties);
88+
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
89+
for(Map.Entry<String,Object> entry:offsetMap.entrySet()){
90+
specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString()));
91+
}
92+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
93+
} catch (Exception e) {
94+
throw new RuntimeException("not support offsetReset type:" + kafka09SourceTableInfo.getOffsetReset());
95+
}
96+
}else {
8097
kafkaSrc.setStartFromLatest();
8198
}
8299

0 commit comments

Comments
 (0)