Skip to content

Commit 8d09eee

Browse files
author
gituser
committed
Merge branch '1.10_release_4.1.x' into 1.10_test_4.1.x
2 parents 6372ac7 + 16ec3d7 commit 8d09eee

15 files changed

Lines changed: 127 additions & 18 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package com.dtstack.flink.sql.parser;
2020

2121
import com.dtstack.flink.sql.util.DtStringUtil;
22+
import com.google.common.base.Preconditions;
2223
import com.google.common.collect.Maps;
24+
import org.apache.commons.lang3.StringUtils;
2325

2426
import java.util.List;
2527
import java.util.Map;
@@ -71,8 +73,9 @@ private Map<String, Object> parseProp(String propsStr){
7173
Map<String, Object> propMap = Maps.newHashMap();
7274
for (String str : strings) {
7375
List<String> ss = DtStringUtil.splitIgnoreQuota(str, '=');
76+
Preconditions.checkState(ss.size() == 2, str + " Format error");
7477
String key = ss.get(0).trim();
75-
String value = ss.get(1).trim().replaceAll("'", "").trim();
78+
String value = DtStringUtil.removeStartAndEndQuota(ss.get(1).trim());
7679
propMap.put(key, value);
7780
}
7881

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.time.LocalDateTime;
3838
import java.util.Map;
3939
import java.util.TimeZone;
40+
import java.util.TimeZone;
4041
import java.util.concurrent.ScheduledExecutorService;
4142
import java.util.concurrent.ScheduledThreadPoolExecutor;
4243
import java.util.concurrent.TimeUnit;
@@ -45,6 +46,7 @@
4546
* Reason:
4647
* Date: 2018/9/18
4748
* Company: www.dtstack.com
49+
*
4850
* @author xuchao
4951
*/
5052

@@ -86,7 +88,8 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
8688

8789
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
8890
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
89-
obj = Timestamp.valueOf(((LocalDateTime) obj));
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9093
}
9194
return obj;
9295
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Collections;
5353
import java.util.List;
5454
import java.util.Map;
55+
import java.util.TimeZone;
5556
import java.util.concurrent.ScheduledFuture;
5657

5758
/**
@@ -71,6 +72,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<Row, BaseRow> im
7172
private int timeOutNum = 0;
7273
protected BaseSideInfo sideInfo;
7374
protected transient Counter parseErrorRecords;
75+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
7476

7577
public BaseAsyncReqRow(BaseSideInfo sideInfo) {
7678
this.sideInfo = sideInfo;
@@ -117,7 +119,8 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
117119

118120
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
119121
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
120-
obj = Timestamp.valueOf(((LocalDateTime) obj));
122+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
123+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
121124
}
122125
return obj;
123126
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractSourceTableInfo.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.table;
2222

2323
import com.google.common.base.Strings;
24+
import com.google.common.collect.Lists;
2425
import com.google.common.collect.Maps;
26+
import org.apache.flink.util.StringUtils;
2527

28+
import java.util.ArrayList;
2629
import java.util.Map;
30+
import java.util.TimeZone;
2731

2832
/**
2933
* Reason:
@@ -36,6 +40,10 @@ public abstract class AbstractSourceTableInfo extends AbstractTableInfo {
3640

3741
public static final String SOURCE_SUFFIX = "Source";
3842

43+
public static final String TIME_ZONE_KEY = "timezone";
44+
45+
private String timeZone = TimeZone.getDefault().getID();
46+
3947
private String eventTimeField;
4048

4149
private Integer maxOutOrderness = 10;
@@ -101,4 +109,23 @@ public String getAdaptSelectSql(){
101109
public String getAdaptName(){
102110
return getName() + "_adapt";
103111
}
112+
113+
public String getTimeZone() {
114+
return timeZone;
115+
}
116+
117+
public void setTimeZone(String timeZone) {
118+
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
119+
return;
120+
}
121+
timeZoneCheck(timeZone);
122+
this.timeZone = timeZone;
123+
}
124+
125+
private void timeZoneCheck(String timeZone) {
126+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
127+
if (!zones.contains(timeZone)){
128+
throw new IllegalArgumentException(" timezone is Incorrect!");
129+
}
130+
}
104131
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@
2323
import com.dtstack.flink.sql.util.ClassUtil;
2424
import com.dtstack.flink.sql.util.DtStringUtil;
2525
import com.google.common.base.Preconditions;
26-
import com.google.common.collect.Lists;
2726
import com.google.common.collect.Maps;
2827
import org.apache.commons.lang3.StringUtils;
2928
import org.apache.flink.api.java.tuple.Tuple2;
3029

30+
import java.util.Arrays;
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.regex.Matcher;
3434
import java.util.regex.Pattern;
35+
import java.util.stream.Collectors;
3536

3637
/**
3738
* Reason:
@@ -46,7 +47,7 @@ public abstract class AbstractTableParser {
4647
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
4748
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4849

49-
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
50+
private static Pattern primaryKeyPattern = Pattern.compile("(?i)(^\\s*)PRIMARY\\s+KEY\\s*\\((.*)\\)");
5051
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+?)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
5152
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5253
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
@@ -142,9 +143,11 @@ private Tuple2<String, String> extractType(String fieldRow, String tableName) {
142143
}
143144

144145
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo) {
145-
String primaryFields = matcher.group(1).trim();
146-
String[] splitArray = primaryFields.split(",");
147-
List<String> primaryKeys = Lists.newArrayList(splitArray);
146+
String primaryFields = matcher.group(2).trim();
147+
List<String> primaryKeys = Arrays
148+
.stream(primaryFields.split(","))
149+
.map(String::trim)
150+
.collect(Collectors.toList());;
148151
tableInfo.setPrimaryKeys(primaryKeys);
149152
}
150153

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,7 @@ public static String getTableFullPath(String schema, String tableName) {
373373
return addQuoteForStr(tableName);
374374
}
375375

376-
String schemaAndTabName = addQuoteForStr(schema) + "." + addQuoteForStr(tableName);
377-
return schemaAndTabName;
376+
return addQuoteForStr(schema) + "." + addQuoteForStr(tableName);
378377
}
379378

380379
/**
@@ -407,4 +406,9 @@ public static String getStartQuote() {
407406
public static String getEndQuote() {
408407
return "\"";
409408
}
409+
410+
public static String removeStartAndEndQuota(String str) {
411+
String removeStart = StringUtils.removeStart(str, "'");
412+
return StringUtils.removeEnd(removeStart, "'");
413+
}
410414
}

core/src/main/java/com/dtstack/flink/sql/watermarker/AbstractCustomerWaterMarker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
3030
import org.apache.flink.streaming.api.windowing.time.Time;
3131

32+
import java.util.TimeZone;
33+
3234
/**
3335
* Reason:
3436
* Date: 2018/10/18
@@ -51,6 +53,8 @@ public abstract class AbstractCustomerWaterMarker<T> extends BoundedOutOfOrderne
5153

5254
protected long lastTime = 0;
5355

56+
protected TimeZone timezone;
57+
5458
public AbstractCustomerWaterMarker(Time maxOutOfOrderness) {
5559
super(maxOutOfOrderness);
5660
}
@@ -98,7 +102,8 @@ public void setFromSourceTag(String fromSourceTag) {
98102

99103
protected long getExtractTimestamp(Long extractTime){
100104

101-
lastTime = extractTime;
105+
lastTime = extractTime + timezone.getOffset(extractTime);
106+
102107
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
103108

104109
return lastTime;

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29+
import java.util.TimeZone;
30+
2931
/**
3032
* Custom watermark --- for eventtime
3133
* Date: 2017/12/28
@@ -37,9 +39,10 @@ public class CustomerWaterMarkerForLong extends AbstractCustomerWaterMarker<Row>
3739

3840
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForLong.class);
3941

40-
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos) {
42+
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos, String timezone) {
4143
super(maxOutOfOrderness);
4244
this.pos = pos;
45+
this.timezone= TimeZone.getTimeZone(timezone);
4346
}
4447

4548
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.watermarker;
2222

@@ -39,9 +39,10 @@ public class CustomerWaterMarkerForTimeStamp extends AbstractCustomerWaterMarker
3939

4040
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForTimeStamp.class);
4141

42-
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos) {
42+
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
4343
super(maxOutOfOrderness);
4444
this.pos = pos;
45+
this.timezone= TimeZone.getTimeZone(timezone);
4546
}
4647

4748
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
4848

4949
int maxOutOrderness = sourceTableInfo.getMaxOutOrderness();
5050

51+
String timeZone = sourceTableInfo.getTimeZone();
52+
5153
String[] fieldNames = typeInfo.getFieldNames();
5254
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
5355

@@ -69,9 +71,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
6971

7072
AbstractCustomerWaterMarker waterMarker = null;
7173
if(fieldType.getTypeClass().isAssignableFrom(Timestamp.class)){
72-
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos);
74+
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos, timeZone);
7375
}else if(fieldType.getTypeClass().isAssignableFrom(Long.class)){
74-
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos);
76+
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos, timeZone);
7577
}else{
7678
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");
7779
}

0 commit comments

Comments
 (0)