Skip to content

Commit 582ea99

Browse files
author
yanxi0227
committed
Merge branch 'master' of github.com:DTStack/flinkStreamSQL into v1.5.0_dev
2 parents b138bac + bd3b789 commit 582ea99

4 files changed

Lines changed: 23 additions & 23 deletions

File tree

README.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
> > * 自定义create view 语法
55
> > * 自定义create function 语法
66
> > * 实现了流与维表的join
7+
> > * 支持原生FLinkSQL所有的语法
78
89
# 已支持
910
* 源表:kafka 0.9,1.x版本
@@ -14,6 +15,8 @@
1415
* 增加oracle维表,结果表功能
1516
* 增加SQlServer维表,结果表功能
1617
* 增加kafka结果表功能
18+
* 增加SQL支持CEP
19+
* 维表快照
1720

1821
## 1 快速起步
1922
### 1.1 运行模式
@@ -26,7 +29,7 @@
2629
### 1.2 执行环境
2730

2831
* Java: JDK8及以上
29-
* Flink集群: 1.4(单机模式不需要安装Flink集群)
32+
* Flink集群: 1.4,1.5(单机模式不需要安装Flink集群)
3033
* 操作系统:理论上不限
3134

3235
### 1.3 打包
@@ -44,12 +47,12 @@ mvn clean package -Dmaven.test.skip
4447
#### 1.4.1 启动命令
4548

4649
```
47-
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp {\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000}
50+
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\}
4851
```
4952

5053
#### 1.4.2 命令行参数选项
5154

52-
* **model**
55+
* **mode**
5356
* 描述:执行模式,也就是flink集群的工作模式
5457
* local: 本地模式
5558
* standalone: 提交到独立部署模式的flink集群
@@ -80,6 +83,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
8083

8184
* **addjar**
8285
* 描述:扩展jar路径,当前主要是UDF定义的jar;
86+
* 格式:json
8387
* 必选:否
8488
* 默认值:无
8589

@@ -180,12 +184,16 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
180184
## 4 样例
181185

182186
```
187+
188+
CREATE (scala|table) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun
189+
190+
183191
CREATE TABLE MyTable(
184192
name varchar,
185193
channel varchar,
186194
pv int,
187195
xctime bigint,
188-
CHARACTER_LENGTH(channel) AS timeLeng
196+
CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数
189197
)WITH(
190198
type ='kafka09',
191199
bootstrapServers ='172.16.8.198:9092',
@@ -223,7 +231,7 @@ CREATE TABLE sideTable(
223231
cf:name varchar as name,
224232
cf:info varchar as info,
225233
PRIMARY KEY(name),
226-
PERIOD FOR SYSTEM_TIME
234+
PERIOD FOR SYSTEM_TIME //维表标识
227235
)WITH(
228236
type ='hbase',
229237
zookeeperQuorum ='rdos1:2181',

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ public class Main {
9191

9292
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
9393

94-
private static final String LOCAL_MODE = "local";
95-
9694
private static final int failureRate = 3;
9795

9896
private static final int failureInterval = 6; //min
@@ -141,7 +139,7 @@ public static void main(String[] args) throws Exception {
141139
Thread.currentThread().setContextClassLoader(dtClassLoader);
142140

143141
URLClassLoader parentClassloader;
144-
if(!LOCAL_MODE.equals(deployMode)){
142+
if(!ClusterMode.local.name().equals(deployMode)){
145143
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
146144
}else{
147145
parentClassloader = dtClassLoader;
@@ -313,7 +311,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
313311
}
314312

315313
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {
316-
StreamExecutionEnvironment env = !LOCAL_MODE.equals(deployMode) ?
314+
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
317315
StreamExecutionEnvironment.getExecutionEnvironment() :
318316
new MyLocalStreamEnvironment();
319317

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public MysqlAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
7171
super(new MysqlAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7272
}
7373

74-
7574
@Override
7675
protected Row fillData(Row input, Object sideInput) {
7776
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,14 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
144144
List<String> value = async.keys(key + ":*").get();
145145
String[] values = value.toArray(new String[value.size()]);
146146
RedisFuture<List<KeyValue<String, String>>> future = async.mget(values);
147-
while (future.isDone()){
148-
try {
149-
List<KeyValue<String, String>> kvList = future.get();
150-
if (kvList.size() != 0){
151-
for (int i=0; i<kvList.size(); i++){
152-
String[] splitKeys = kvList.get(i).getKey().split(":");
147+
future.thenAccept(new Consumer<List<KeyValue<String, String>>>() {
148+
@Override
149+
public void accept(List<KeyValue<String, String>> keyValues) {
150+
if (keyValues.size() != 0){
151+
for (int i=0; i<keyValues.size(); i++){
152+
String[] splitKeys = keyValues.get(i).getKey().split(":");
153153
keyValue.put(splitKeys[1], splitKeys[2]);
154-
keyValue.put(splitKeys[3], kvList.get(i).getValue());
154+
keyValue.put(splitKeys[3], keyValues.get(i).getValue());
155155
}
156156
Row row = fillData(input, keyValue);
157157
resultFuture.complete(Collections.singleton(row));
@@ -164,13 +164,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
164164
putCache(key, CacheMissVal.getMissKeyObj());
165165
}
166166
}
167-
} catch (InterruptedException e1) {
168-
e1.printStackTrace();
169-
} catch (ExecutionException e1) {
170-
e1.printStackTrace();
171167
}
172-
}
173-
168+
});
174169
}
175170

176171
private String buildCacheKey(List<String> keyData) {

0 commit comments

Comments
 (0)