Skip to content

Commit 9189a73

Browse files
Merge branch 'v1.5.0_dev' of https://github.com/XuQianJin-Stars/flinkStreamSQL into v1.5.0_dev
2 parents b382bbe + 87c845f commit 9189a73

93 files changed

Lines changed: 4423 additions & 1360 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
> > * 自定义create function 语法
66
> > * 实现了流与维表的join
77
> > * 支持原生FLinkSQL所有的语法
8+
> > * 扩展了输入和输出的性能指标到promethus
9+
810

911
# 已支持
1012
* 源表:kafka 0.9,1.x版本
@@ -17,6 +19,7 @@
1719
* 增加kafka结果表功能
1820
* 增加SQL支持CEP
1921
* 维表快照
22+
* sql优化(谓词下移等)
2023

2124
## 1 快速起步
2225
### 1.1 运行模式
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
/**
22+
* Database type
23+
*
24+
* Company: www.dtstack.com
25+
* @author jiangbo
26+
*/
27+
public enum EDatabaseType {
28+
29+
MYSQL,
30+
SQLSERVER,
31+
ORACLE,
32+
33+
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.parser;
2222

23+
import com.dtstack.flink.sql.util.DtStringUtil;
2324
import org.apache.calcite.sql.*;
2425
import org.apache.calcite.sql.parser.SqlParseException;
2526
import org.apache.calcite.sql.parser.SqlParser;
@@ -77,7 +78,8 @@ public void parseSql(String sql, SqlTree sqlTree) {
7778
parseNode(sqlNode, sqlParseResult);
7879

7980
sqlParseResult.setTableName(tableName);
80-
sqlParseResult.setExecSql(selectSql.toUpperCase());
81+
String transformSelectSql = DtStringUtil.replaceIgnoreQuota(sqlNode.toString(), "`", "");
82+
sqlParseResult.setExecSql(transformSelectSql);
8183
sqlTree.addTmpSql(sqlParseResult);
8284
sqlTree.addTmplTableInfo(tableName, sqlParseResult);
8385
} else {

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* @author xuchao
3838
*/
3939

40-
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row>{
40+
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row> implements ISideReqRow {
4141

4242
protected SideInfo sideInfo;
4343

@@ -48,8 +48,6 @@ public AllReqRow(SideInfo sideInfo){
4848

4949
}
5050

51-
protected abstract Row fillData(Row input, Object sideInput);
52-
5351
protected abstract void initCache() throws SQLException;
5452

5553
protected abstract void reloadCache();

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
* @author xuchao
4141
*/
4242

43-
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> {
43+
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
4444

4545
private static final long serialVersionUID = 2098635244857937717L;
4646

@@ -79,9 +79,6 @@ protected boolean openCache(){
7979
return sideInfo.getSideCache() != null;
8080
}
8181

82-
83-
protected abstract Row fillData(Row input, Object sideInput);
84-
8582
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
8683
if(sideInfo.getJoinType() == JoinType.LEFT){
8784
//Reserved left table data
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side;
20+
21+
import org.apache.flink.types.Row;
22+
23+
/**
24+
*
25+
* Date: 2018/12/4
26+
* Company: www.dtstack.com
27+
* @author xuchao
28+
*/
29+
public interface ISideReqRow {
30+
31+
Row fillData(Row input, Object sideInput);
32+
33+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.sink;
19+
20+
import com.dtstack.flink.sql.metric.MetricConstant;
21+
import org.apache.flink.api.java.tuple.Tuple2;
22+
23+
import org.apache.flink.api.common.io.RichOutputFormat;
24+
import org.apache.flink.metrics.Counter;
25+
import org.apache.flink.metrics.Meter;
26+
import org.apache.flink.metrics.MeterView;
27+
28+
/**
29+
* Created by sishu.yss on 2018/11/28.
30+
*/
31+
public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{
32+
33+
protected transient Counter outRecords;
34+
35+
protected transient Meter outRecordsRate;
36+
37+
public void initMetric() {
38+
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
39+
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
40+
}
41+
42+
}

core/src/main/java/com/dtstack/flink/sql/table/AbsSideTableParser.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.dtstack.flink.sql.util.MathUtil;
2626

2727
import java.util.Map;
28+
import java.util.regex.Matcher;
29+
import java.util.regex.Pattern;
2830

2931
/**
3032
* Reason:
@@ -35,6 +37,19 @@
3537

3638
public abstract class AbsSideTableParser extends AbsTableParser {
3739

40+
private final static String SIDE_SIGN_KEY = "sideSignKey";
41+
42+
private final static Pattern SIDE_TABLE_SIGN = Pattern.compile("(?i)^PERIOD\\s+FOR\\s+SYSTEM_TIME$");
43+
44+
static {
45+
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
46+
keyHandlerMap.put(SIDE_SIGN_KEY, AbsSideTableParser::dealSideSign);
47+
}
48+
49+
private static void dealSideSign(Matcher matcher, TableInfo tableInfo){
50+
//FIXME SIDE_TABLE_SIGN current just used as a sign for side table; and do nothing
51+
}
52+
3853
//Analytical create table attributes ==> Get information cache
3954
protected void parseCacheProp(SideTableInfo sideTableInfo, Map<String, Object> props){
4055
if(props.containsKey(SideTableInfo.CACHE_KEY.toLowerCase())){

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2424

25+
import java.io.Serializable;
2526
import java.util.List;
2627

2728
/**
@@ -31,7 +32,7 @@
3132
* @author xuchao
3233
*/
3334

34-
public abstract class TableInfo {
35+
public abstract class TableInfo implements Serializable {
3536

3637
public static final String PARALLELISM_KEY = "parallelism";
3738

@@ -121,6 +122,29 @@ public void addFieldType(String fieldType){
121122
fieldTypeList.add(fieldType);
122123
}
123124

125+
public void setFields(String[] fields) {
126+
this.fields = fields;
127+
}
128+
129+
public void setFieldTypes(String[] fieldTypes) {
130+
this.fieldTypes = fieldTypes;
131+
}
132+
133+
public void setFieldClasses(Class<?>[] fieldClasses) {
134+
this.fieldClasses = fieldClasses;
135+
}
136+
137+
public List<String> getFieldList() {
138+
return fieldList;
139+
}
140+
141+
public List<String> getFieldTypeList() {
142+
return fieldTypeList;
143+
}
144+
145+
public List<Class> getFieldClassList() {
146+
return fieldClassList;
147+
}
124148

125149
public void finish(){
126150
this.fields = fieldList.toArray(new String[fieldList.size()]);

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
package com.dtstack.flink.sql.util;
2222

2323
import com.dtstack.flink.sql.enums.ColumnType;
24+
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2526
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
2628

2729
import java.math.BigDecimal;
2830
import java.util.ArrayList;
@@ -42,6 +44,9 @@ public class DtStringUtil {
4244

4345
private static final Pattern NO_VERSION_PATTERN = Pattern.compile("([a-zA-Z]+).*");
4446

47+
private static ObjectMapper objectMapper = new ObjectMapper();
48+
49+
4550
/**
4651
* Split the specified string delimiter --- ignored quotes delimiter
4752
* @param str
@@ -207,4 +212,17 @@ public static String addJdbcParam(String dbUrl, Map<String, String> addParams, b
207212

208213
return preStr + "?" + sb.toString();
209214
}
215+
216+
public static boolean isJosn(String str){
217+
boolean flag = false;
218+
if(StringUtils.isNotBlank(str)){
219+
try {
220+
objectMapper.readValue(str,Map.class);
221+
flag = true;
222+
} catch (Throwable e) {
223+
flag=false;
224+
}
225+
}
226+
return flag;
227+
}
210228
}

0 commit comments

Comments
 (0)