Skip to content

Commit 3667dde

Browse files
committed
Merge branch 'feat_1.10_dealTimeZone' into 'v1.10.0_dev'
Feat 1.10 deal time zone See merge request dt-insight-engine/flinkStreamSQL!70
2 parents 63cde11 + 85640b5 commit 3667dde

7 files changed

Lines changed: 22 additions & 50 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,8 @@
7070
import java.net.URL;
7171
import java.net.URLClassLoader;
7272
import java.net.URLDecoder;
73-
import java.util.Arrays;
74-
import java.util.List;
75-
import java.util.Map;
76-
import java.util.Properties;
77-
import java.util.Set;
73+
import java.time.ZoneId;
74+
import java.util.*;
7875

7976
/**
8077
* 任务执行时的流程方法
@@ -88,6 +85,8 @@ public class ExecuteProcessHelper {
8885
private static final Logger LOG = LoggerFactory.getLogger(ExecuteProcessHelper.class);
8986
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
9087

88+
private static final String TIME_ZONE = "timezone";
89+
9190
public static FlinkPlanner flinkPlanner = new FlinkPlanner();
9291

9392
public static ParamsInfo parseParams(String[] args) throws Exception {
@@ -358,9 +357,21 @@ public static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironmen
358357
.inStreamingMode()
359358
.build();
360359

361-
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, new TableConfig());
360+
TableConfig tableConfig = new TableConfig();
361+
362+
timeZoneCheck(confProperties.getProperty(TIME_ZONE));
363+
364+
tableConfig.setLocalTimeZone(ZoneId.of(confProperties.getProperty(TIME_ZONE)));
365+
366+
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, tableConfig);
362367
StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
363368
return tableEnv;
364369
}
365370

371+
private static void timeZoneCheck(String timeZone) {
372+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
373+
if (!zones.contains(timeZone)){
374+
throw new IllegalArgumentException(" timezone is Incorrect!");
375+
}
376+
}
366377
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractSourceTableInfo.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,9 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import com.google.common.base.Strings;
24-
import com.google.common.collect.Lists;
2524
import com.google.common.collect.Maps;
26-
import org.apache.flink.util.StringUtils;
2725

28-
import java.util.ArrayList;
2926
import java.util.Map;
30-
import java.util.TimeZone;
3127

3228
/**
3329
* Reason:
@@ -40,10 +36,6 @@ public abstract class AbstractSourceTableInfo extends AbstractTableInfo {
4036

4137
public static final String SOURCE_SUFFIX = "Source";
4238

43-
public static final String TIME_ZONE_KEY="timezone";
44-
45-
private String timeZone=TimeZone.getDefault().getID();
46-
4739
private String eventTimeField;
4840

4941
private Integer maxOutOrderness = 10;
@@ -109,23 +101,4 @@ public String getAdaptSelectSql(){
109101
public String getAdaptName(){
110102
return getName() + "_adapt";
111103
}
112-
113-
public String getTimeZone() {
114-
return timeZone;
115-
}
116-
117-
public void setTimeZone(String timeZone) {
118-
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
119-
return;
120-
}
121-
timeZoneCheck(timeZone);
122-
this.timeZone = timeZone;
123-
}
124-
125-
private void timeZoneCheck(String timeZone) {
126-
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
127-
if (!zones.contains(timeZone)){
128-
throw new IllegalArgumentException(" timezone is Incorrect!");
129-
}
130-
}
131104
}

core/src/main/java/com/dtstack/flink/sql/watermarker/AbstractCustomerWaterMarker.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
3030
import org.apache.flink.streaming.api.windowing.time.Time;
3131

32-
import java.util.TimeZone;
33-
3432
/**
3533
* Reason:
3634
* Date: 2018/10/18
@@ -53,8 +51,6 @@ public abstract class AbstractCustomerWaterMarker<T> extends BoundedOutOfOrderne
5351

5452
protected long lastTime = 0;
5553

56-
protected TimeZone timezone;
57-
5854
public AbstractCustomerWaterMarker(Time maxOutOfOrderness) {
5955
super(maxOutOfOrderness);
6056
}
@@ -102,8 +98,7 @@ public void setFromSourceTag(String fromSourceTag) {
10298

10399
protected long getExtractTimestamp(Long extractTime){
104100

105-
lastTime = extractTime + timezone.getOffset(extractTime);
106-
101+
lastTime = extractTime;
107102
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
108103

109104
return lastTime;

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29-
import java.util.TimeZone;
30-
3129
/**
3230
* Custom watermark --- for eventtime
3331
* Date: 2017/12/28
@@ -39,10 +37,9 @@ public class CustomerWaterMarkerForLong extends AbstractCustomerWaterMarker<Row>
3937

4038
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForLong.class);
4139

42-
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos,String timezone) {
40+
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos) {
4341
super(maxOutOfOrderness);
4442
this.pos = pos;
45-
this.timezone= TimeZone.getTimeZone(timezone);
4643
}
4744

4845
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,9 @@ public class CustomerWaterMarkerForTimeStamp extends AbstractCustomerWaterMarker
3939

4040
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForTimeStamp.class);
4141

42-
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
42+
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos) {
4343
super(maxOutOfOrderness);
4444
this.pos = pos;
45-
this.timezone= TimeZone.getTimeZone(timezone);
4645
}
4746

4847
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
4848

4949
int maxOutOrderness = sourceTableInfo.getMaxOutOrderness();
5050

51-
String timeZone=sourceTableInfo.getTimeZone();
52-
5351
String[] fieldNames = typeInfo.getFieldNames();
5452
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
5553

@@ -71,9 +69,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
7169

7270
AbstractCustomerWaterMarker waterMarker = null;
7371
if(fieldType.getTypeClass().isAssignableFrom(Timestamp.class)){
74-
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos,timeZone);
72+
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos);
7573
}else if(fieldType.getTypeClass().isAssignableFrom(Long.class)){
76-
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos,timeZone);
74+
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos);
7775
}else{
7876
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");
7977
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5050
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5151
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5252
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));
53-
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5453

5554
kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
5655
kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));

0 commit comments

Comments
 (0)