Skip to content

Commit e346a80

Browse files
author
xuchao
committed
Merge branch '1.8_release_3.10.x' into v1.10.0_dev
# Conflicts: # cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java # core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java # core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java # core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java # elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java # hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java # kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java # kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java # mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java # redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java
2 parents ea1babb + 828062e commit e346a80

145 files changed

Lines changed: 7361 additions & 3330 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 34 additions & 307 deletions
Original file line numberDiff line numberDiff line change
@@ -1,309 +1,36 @@
1-
# flinkStreamSQL
2-
> * 基于开源的flink,对其实时sql进行扩展
3-
> > * 自定义create table 语法(包括源表,输出表,维表)
4-
> > * 自定义create view 语法
5-
> > * 自定义create function 语法
6-
> > * 实现了流与维表的join
7-
> > * 支持原生FLinkSQL所有的语法
8-
> > * 扩展了输入和输出的性能指标到promethus
9-
10-
# 已支持
11-
* 源表:kafka 0.9、0.10、0.11、1.x版本
12-
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver
13-
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver
14-
15-
## 1 快速起步
16-
### 1.1 运行模式
17-
18-
19-
* 单机模式:对应Flink集群的单机模式
20-
* standalone模式:对应Flink集群的分布式模式
21-
* yarn模式:对应Flink集群的yarn模式
22-
23-
### 1.2 执行环境
24-
25-
* Java: JDK8及以上
26-
* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群)
27-
* 操作系统:理论上不限
28-
* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
29-
```
30-
## hadoop配置文件路径
31-
fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250
32-
security.kerberos.login.use-ticket-cache: true
33-
security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/maqi.keytab
34-
security.kerberos.login.principal: maqi@DTSTACK.COM
35-
security.kerberos.login.contexts: Client,KafkaClient
36-
zookeeper.sasl.service-name: zookeeper
37-
zookeeper.sasl.login-context-name: Client
38-
39-
```
40-
41-
### 1.3 打包
42-
43-
进入项目根目录,使用maven打包:
44-
45-
```
46-
mvn clean package -Dmaven.test.skip
47-
48-
```
49-
50-
打包完成后的包结构:
51-
52-
> * dt-center-flinkStreamSQL
53-
> > * bin: 任务启动脚本
54-
> > * lib: launcher包存储路径,是任务提交的入口
55-
> > * plugins: 插件包存储路径
56-
> > * ........ : core及插件代码
57-
58-
### 1.4 启动
59-
60-
#### 1.4.1 启动命令
61-
62-
```
63-
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\} -yarnSessionConf \{\"yid\":\"application_1564971615273_38182\"}
64-
```
65-
66-
#### 1.4.2 命令行参数选项
67-
68-
* **mode**
69-
* 描述:执行模式,也就是flink集群的工作模式
70-
* local: 本地模式
71-
* standalone: 提交到独立部署模式的flink集群
72-
* yarn: 提交到yarn模式的flink集群(即提交到已有flink集群)
73-
* yarnPer: yarn per_job模式提交(即创建新flink application)
74-
* 必选:否
75-
* 默认值:local
76-
77-
* **name**
78-
* 描述:flink 任务对应名称。
79-
* 必选:是
80-
* 默认值:无
81-
82-
* **sql**
83-
* 描述:执行flink sql 的主体语句。
84-
* 必选:是
85-
* 默认值:无
86-
87-
* **localSqlPluginPath**
88-
* 描述:本地插件根目录地址,也就是打包后产生的plugins目录。
89-
* 必选:是
90-
* 默认值:无
91-
92-
* **remoteSqlPluginPath**
93-
* 描述:flink执行集群上的插件根目录地址(将打包好的插件存放到各个flink节点上,如果是yarn集群需要存放到所有的nodemanager上)。
94-
* 必选:否
95-
* 默认值:无
96-
97-
* **addjar**
98-
* 描述:扩展jar路径,当前主要是UDF定义的jar;
99-
* 格式:json
100-
* 必选:否
101-
* 默认值:无
1+
FlinkStreamSQL
2+
============
3+
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
4+
5+
## 技术交流
6+
- 招聘**大数据平台开发工程师**,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至[sishu@dtstack.com](mailto:sishu@dtstack.com)
7+
- 我们使用[钉钉](https://www.dingtalk.com/)沟通交流,可以搜索群号[**30537511**]或者扫描下面的二维码进入钉钉群
8+
<div align=center>
9+
<img src=docs/images/streamsql_dd.jpg width=300 />
10+
</div>
11+
12+
## 介绍
13+
* 基于开源的flink,对其实时sql进行扩展
14+
* 自定义create table 语法(包括源表,输出表,维表)
15+
* 自定义create view 语法
16+
* 自定义create function 语法
17+
* 实现了流与维表的join
18+
* 支持原生FlinkSQL所有的语法
19+
* 扩展了输入和输出的性能指标到Task metrics
10220

103-
* **confProp**
104-
* 描述:一些参数设置
105-
* 格式: json
106-
* 必选:是 (如无参数填写空json即可)
107-
* 默认值:无
108-
* 可选参数:
109-
* sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒)
110-
* sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟
111-
* state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。
112-
* state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。
113-
* state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。
114-
* sql.env.parallelism: 默认并行度设置
115-
* sql.max.env.parallelism: 最大并行度设置
116-
* time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime]
117-
* sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms)
118-
* sql.checkpoint.mode: 可选值[EXACTLY_ONCE|AT_LEAST_ONCE]
119-
* sql.checkpoint.timeout: 生成checkpoint的超时时间(ms)
120-
* sql.max.concurrent.checkpoints: 最大并发生成checkpoint数
121-
* sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)]
122-
* flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file://
123-
* jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768)
124-
* taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768)
125-
* taskmanager.num: per_job模式下指定taskmanager的实例数(默认1)
126-
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
127-
* savePointPath:任务恢复点的路径(默认无)
128-
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
129-
* restore.enable:是否失败重启(默认是true)
130-
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
131-
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
132-
* logLevel: 日志级别动态配置(默认info)
133-
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
21+
## 目录
22+
23+
[ 1.1 demo](docs/demo.md)
24+
[ 1.2 快速开始](docs/quickStart.md)
25+
[ 1.3 参数配置](docs/config.md)
26+
[ 1.4 支持的插件介绍和demo](docs/pluginsInfo.md)
27+
[ 1.5 指标参数](docs/newMetric.md)
28+
[ 1.6 自定义函数](docs/function.md)
29+
[ 1.7 自定义视图](docs/createView.md)
30+
31+
## 如何贡献FlinkStreamSQL
13432

135-
136-
* **flinkconf**
137-
* 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
138-
* 必选:否
139-
* 默认值:无
140-
141-
* **yarnconf**
142-
* 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
143-
* 必选:否
144-
* 默认值:无
145-
146-
* **flinkJarPath**
147-
* 描述:per_job 模式提交需要指定本地的flink jar存放路径
148-
* 必选:否
149-
* 默认值:false
150-
151-
* **queue**
152-
* 描述:per_job 模式下指定的yarn queue
153-
* 必选:否
154-
* 默认值:false
155-
156-
* **pluginLoadMode**
157-
* 描述:per_job 模式下的插件包加载方式。classpath:从每台机器加载插件包,shipfile:将需要插件从提交的节点上传到hdfs,不需要每台安装插件
158-
* 必选:否
159-
* 默认值:classpath
160-
161-
* **yarnSessionConf**
162-
* 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
163-
* 必选:否
164-
* 默认值:false
165-
166-
167-
## 2 结构
168-
### 2.1 源表插件
169-
* [kafka 源表插件](docs/kafkaSource.md)
170-
171-
### 2.2 结果表插件
172-
* [elasticsearch 结果表插件](docs/elasticsearchSink.md)
173-
* [hbase 结果表插件](docs/hbaseSink.md)
174-
* [mysql 结果表插件](docs/mysqlSink.md)
175-
* [oracle 结果表插件](docs/oracleSink.md)
176-
* [mongo 结果表插件](docs/mongoSink.md)
177-
* [redis 结果表插件](docs/redisSink.md)
178-
* [cassandra 结果表插件](docs/cassandraSink.md)
179-
* [kudu 结果表插件](docs/kuduSink.md)
180-
* [postgresql 结果表插件](docs/postgresqlSink.md)
181-
* [clickhouse 结果表插件](docs/clickhouseSink.md)
182-
* [impala 结果表插件](docs/impalaSink.md)
183-
* [db2 结果表插件](docs/db2Sink.md)
184-
* [sqlserver 结果表插件](docs/sqlserverSink.md)
185-
* [kafka 结果表插件](docs/kafkaSink.md)
186-
187-
### 2.3 维表插件
188-
* [hbase 维表插件](docs/hbaseSide.md)
189-
* [mysql 维表插件](docs/mysqlSide.md)
190-
* [oracle 维表插件](docs/oracleSide.md)
191-
* [mongo 维表插件](docs/mongoSide.md)
192-
* [redis 维表插件](docs/redisSide.md)
193-
* [cassandra 维表插件](docs/cassandraSide.md)
194-
* [kudu 维表插件](docs/kuduSide.md)
195-
* [postgresql 维表插件](docs/postgresqlSide.md)
196-
* [clickhouse 维表插件](docs/clickhouseSide.md)
197-
* [impala 维表插件](docs/impalaSide.md)
198-
* [db2 维表插件](docs/db2Side.md)
199-
* [sqlserver 维表插件](docs/sqlserverSide.md)
200-
201-
## 3 性能指标(新增)
202-
203-
### kafka插件
204-
* 业务延迟: flink_taskmanager_job_task_operator_dtEventDelay(单位s)
205-
数据本身的时间和进入flink的当前时间的差值.
206-
207-
* 各个输入源的脏数据:flink_taskmanager_job_task_operator_dtDirtyData
208-
从kafka获取的数据解析失败的视为脏数据
209-
210-
* 各Source的数据输入TPS: flink_taskmanager_job_task_operator_dtNumRecordsInRate
211-
kafka接受的记录数(未解析前)/s
212-
213-
* 各Source的数据输入RPS: flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate
214-
kafka接受的记录数(解析后)/s
215-
216-
* 各Source的数据输入BPS: flink_taskmanager_job_task_operator_dtNumBytesInRate
217-
kafka接受的字节数/s
218-
219-
* Kafka作为输入源的各个分区的延迟数: flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag
220-
当前kafka10,kafka11有采集该指标
221-
222-
* 各个输出源RPS: flink_taskmanager_job_task_operator_dtNumRecordsOutRate
223-
写入的外部记录数/s
224-
225-
226-
## 4 样例
227-
228-
```
229-
230-
CREATE (scala|table|aggregate) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun;
231-
232-
233-
CREATE TABLE MyTable(
234-
name varchar,
235-
channel varchar,
236-
pv int,
237-
xctime bigint,
238-
CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数
239-
)WITH(
240-
type ='kafka09',
241-
bootstrapServers ='172.16.8.198:9092',
242-
zookeeperQuorum ='172.16.8.198:2181/kafka',
243-
offsetReset ='latest',
244-
topic ='nbTest1',
245-
parallelism ='1'
246-
);
247-
248-
CREATE TABLE MyResult(
249-
channel varchar,
250-
pv varchar
251-
)WITH(
252-
type ='mysql',
253-
url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
254-
userName ='dtstack',
255-
password ='abc123',
256-
tableName ='pv2',
257-
parallelism ='1'
258-
);
259-
260-
CREATE TABLE workerinfo(
261-
cast(logtime as TIMESTAMP) AS rtime,
262-
cast(logtime) AS rtime
263-
)WITH(
264-
type ='hbase',
265-
zookeeperQuorum ='rdos1:2181',
266-
tableName ='workerinfo',
267-
rowKey ='ce,de',
268-
parallelism ='1',
269-
zookeeperParent ='/hbase'
270-
);
271-
272-
CREATE TABLE sideTable(
273-
cf:name varchar as name,
274-
cf:info varchar as info,
275-
PRIMARY KEY(name),
276-
PERIOD FOR SYSTEM_TIME //维表标识
277-
)WITH(
278-
type ='hbase',
279-
zookeeperQuorum ='rdos1:2181',
280-
zookeeperParent ='/hbase',
281-
tableName ='workerinfo',
282-
cache ='LRU',
283-
cacheSize ='10000',
284-
cacheTTLMs ='60000',
285-
parallelism ='1'
286-
);
287-
288-
insert
289-
into
290-
MyResult
291-
select
292-
d.channel,
293-
d.info
294-
from
295-
( select
296-
a.*,b.info
297-
from
298-
MyTable a
299-
join
300-
sideTable b
301-
on a.channel=b.name
302-
where
303-
a.channel = 'xc2'
304-
and a.pv=10 ) as d
305-
```
306-
307-
# 招聘
308-
1.大数据平台开发工程师,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至sishu@dtstack.com
309-
33+
## License
34+
FlinkStreamSQL is under the Apache 2.0 license. See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) file for details.
35+
36+

0 commit comments

Comments
 (0)