Skip to content

Commit 7b4fd9d

Browse files
committed
【opt】opt timezone
1 parent 622b4b8 commit 7b4fd9d

7 files changed

Lines changed: 13 additions & 42 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,15 @@
2525
import com.dtstack.flink.sql.parser.SqlParser;
2626
import com.dtstack.flink.sql.parser.SqlTree;
2727
import org.apache.flink.api.common.typeinfo.TypeInformation;
28-
import org.apache.flink.api.java.tuple.Tuple2;
2928
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3029
import org.apache.flink.streaming.api.datastream.DataStream;
3130
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3231
import org.apache.flink.table.api.*;
33-
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
3432
import org.apache.flink.table.api.java.StreamTableEnvironment;
3533
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
3634
import org.apache.flink.table.sinks.TableSink;
37-
import org.apache.flink.types.Row;
3835

3936
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
40-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
4137
import com.dtstack.flink.sql.enums.ClusterMode;
4238
import com.dtstack.flink.sql.enums.ECacheType;
4339
import com.dtstack.flink.sql.enums.EPluginLoadMode;
@@ -74,6 +70,7 @@
7470
import java.net.URL;
7571
import java.net.URLClassLoader;
7672
import java.net.URLDecoder;
73+
import java.time.ZoneId;
7774
import java.util.Arrays;
7875
import java.util.List;
7976
import java.util.Map;
@@ -92,6 +89,8 @@ public class ExecuteProcessHelper {
9289
private static final Logger LOG = LoggerFactory.getLogger(ExecuteProcessHelper.class);
9390
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
9491

92+
private static final String TIME_ZONE = "timezone";
93+
9594
public static FlinkPlanner flinkPlanner = new FlinkPlanner();
9695

9796
public static ParamsInfo parseParams(String[] args) throws Exception {
@@ -108,7 +107,6 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
108107
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
109108
String pluginLoadMode = options.getPluginLoadMode();
110109
String deployMode = options.getMode();
111-
String logLevel = options.getLogLevel();
112110

113111
Preconditions.checkArgument(checkRemoteSqlPluginPath(remoteSqlPluginPath, deployMode, pluginLoadMode),
114112
"Non-local mode or shipfile deployment mode, remoteSqlPluginPath is required");
@@ -358,7 +356,11 @@ private static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironme
358356
.inStreamingMode()
359357
.build();
360358

361-
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, new TableConfig());
359+
TableConfig tableConfig = new TableConfig();
360+
361+
tableConfig.setLocalTimeZone(ZoneId.of(confProperties.getProperty(TIME_ZONE)));
362+
363+
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings, tableConfig);
362364
StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
363365
return tableEnv;
364366
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractSourceTableInfo.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,6 @@ public abstract class AbstractSourceTableInfo extends AbstractTableInfo {
4040

4141
public static final String SOURCE_SUFFIX = "Source";
4242

43-
public static final String TIME_ZONE_KEY="timezone";
44-
45-
private String timeZone=TimeZone.getDefault().getID();
46-
4743
private String eventTimeField;
4844

4945
private Integer maxOutOrderness = 10;
@@ -109,23 +105,4 @@ public String getAdaptSelectSql(){
109105
public String getAdaptName(){
110106
return getName() + "_adapt";
111107
}
112-
113-
public String getTimeZone() {
114-
return timeZone;
115-
}
116-
117-
public void setTimeZone(String timeZone) {
118-
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
119-
return;
120-
}
121-
timeZoneCheck(timeZone);
122-
this.timeZone = timeZone;
123-
}
124-
125-
private void timeZoneCheck(String timeZone) {
126-
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
127-
if (!zones.contains(timeZone)){
128-
throw new IllegalArgumentException(" timezone is Incorrect!");
129-
}
130-
}
131108
}

core/src/main/java/com/dtstack/flink/sql/watermarker/AbstractCustomerWaterMarker.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ public abstract class AbstractCustomerWaterMarker<T> extends BoundedOutOfOrderne
5353

5454
protected long lastTime = 0;
5555

56-
protected TimeZone timezone;
57-
5856
public AbstractCustomerWaterMarker(Time maxOutOfOrderness) {
5957
super(maxOutOfOrderness);
6058
}
@@ -102,8 +100,7 @@ public void setFromSourceTag(String fromSourceTag) {
102100

103101
protected long getExtractTimestamp(Long extractTime){
104102

105-
lastTime = extractTime + timezone.getOffset(extractTime);
106-
103+
lastTime = extractTime;
107104
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
108105

109106
return lastTime;

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,9 @@ public class CustomerWaterMarkerForLong extends AbstractCustomerWaterMarker<Row>
3939

4040
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForLong.class);
4141

42-
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos,String timezone) {
42+
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos) {
4343
super(maxOutOfOrderness);
4444
this.pos = pos;
45-
this.timezone= TimeZone.getTimeZone(timezone);
4645
}
4746

4847
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,9 @@ public class CustomerWaterMarkerForTimeStamp extends AbstractCustomerWaterMarker
3939

4040
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForTimeStamp.class);
4141

42-
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
42+
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos) {
4343
super(maxOutOfOrderness);
4444
this.pos = pos;
45-
this.timezone= TimeZone.getTimeZone(timezone);
4645
}
4746

4847
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
4848

4949
int maxOutOrderness = sourceTableInfo.getMaxOutOrderness();
5050

51-
String timeZone=sourceTableInfo.getTimeZone();
52-
5351
String[] fieldNames = typeInfo.getFieldNames();
5452
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
5553

@@ -71,9 +69,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
7169

7270
AbstractCustomerWaterMarker waterMarker = null;
7371
if(fieldType.getTypeClass().isAssignableFrom(Timestamp.class)){
74-
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos,timeZone);
72+
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos);
7573
}else if(fieldType.getTypeClass().isAssignableFrom(Long.class)){
76-
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos,timeZone);
74+
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos);
7775
}else{
7876
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");
7977
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5050
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5151
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5252
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));
53-
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5453

5554
kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
5655
kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));

0 commit comments

Comments
 (0)