Skip to content

Commit dfe3f6b

Browse files
committed
convert rowtime timezone
1 parent 3da8711 commit dfe3f6b

7 files changed

Lines changed: 67 additions & 14 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/table/SourceTableInfo.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
24+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2425
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
2526

27+
import java.util.ArrayList;
2628
import java.util.Map;
29+
import java.util.TimeZone;
2730

2831
/**
2932
* Reason:
@@ -36,6 +39,10 @@ public abstract class SourceTableInfo extends TableInfo {
3639

3740
public static final String SOURCE_SUFFIX = "Source";
3841

42+
public static final String TIME_ZONE_KEY="timezone";
43+
44+
private String timeZone="Asia/Shanghai";
45+
3946
private String eventTimeField;
4047

4148
private Integer maxOutOrderness = 10;
@@ -101,4 +108,23 @@ public String getAdaptSelectSql(){
101108
public String getAdaptName(){
102109
return getName() + "_adapt";
103110
}
111+
112+
public String getTimeZone() {
113+
return timeZone;
114+
}
115+
116+
public void setTimeZone(String timeZone) {
117+
if (timeZone==null){
118+
return;
119+
}
120+
timeZoneCheck(timeZone);
121+
this.timeZone = timeZone;
122+
}
123+
124+
private void timeZoneCheck(String timeZone) {
125+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
126+
if (!zones.contains(timeZone)){
127+
throw new IllegalArgumentException(" timezone is Incorrect!");
128+
}
129+
}
104130
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.watermarker;
2222

2323
import com.dtstack.flink.sql.util.MathUtil;
24-
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
2524
import org.apache.flink.streaming.api.windowing.time.Time;
2625
import org.apache.flink.types.Row;
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
2928

29+
import java.util.TimeZone;
30+
3031
/**
3132
* Custom watermark --- for eventtime
3233
* Date: 2017/12/28
@@ -44,23 +45,34 @@ public class CustomerWaterMarkerForLong extends AbsCustomerWaterMarker<Row> {
4445

4546
private long lastTime = 0;
4647

47-
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos) {
48+
private TimeZone timezone;
49+
50+
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos,String timezone) {
4851
super(maxOutOfOrderness);
4952
this.pos = pos;
53+
this.timezone= TimeZone.getTimeZone(timezone);
5054
}
5155

5256
@Override
5357
public long extractTimestamp(Row row) {
5458

5559
try{
5660
Long eveTime = MathUtil.getLongVal(row.getField(pos));
57-
lastTime = eveTime;
58-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - eveTime)/1000));
59-
return eveTime;
61+
Long extractTime=eveTime;
62+
63+
lastTime = extractTime + timezone.getOffset(extractTime);
64+
65+
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - convertTimeZone(extractTime))/1000));
66+
67+
return lastTime;
6068
}catch (Exception e){
6169
logger.error("", e);
6270
}
63-
6471
return lastTime;
6572
}
73+
74+
public long convertTimeZone(long evenTime){
75+
long res = evenTime - timezone.getOffset(evenTime) + TimeZone.getDefault().getOffset(evenTime);
76+
return res;
77+
}
6678
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.sql.Timestamp;
30+
import java.util.TimeZone;
3031

3132
/**
3233
* Custom watermark --- for eventtime
@@ -45,25 +46,34 @@ public class CustomerWaterMarkerForTimeStamp extends AbsCustomerWaterMarker<Row>
4546

4647
private long lastTime = 0;
4748

49+
private TimeZone timezone;
4850

49-
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos) {
51+
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
5052
super(maxOutOfOrderness);
5153
this.pos = pos;
54+
this.timezone= TimeZone.getTimeZone(timezone);
5255
}
5356

5457
@Override
5558
public long extractTimestamp(Row row) {
5659
try {
5760
Timestamp time = (Timestamp) row.getField(pos);
58-
lastTime = time.getTime();
5961

60-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - time.getTime())/1000));
61-
return time.getTime();
62+
long extractTime=time.getTime();
63+
64+
lastTime = extractTime + timezone.getOffset(extractTime);
65+
66+
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - convertTimeZone(extractTime))/1000));
67+
68+
return lastTime;
6269
} catch (RuntimeException e) {
6370
logger.error("", e);
6471
}
6572
return lastTime;
6673
}
6774

68-
75+
public long convertTimeZone(long evenTime){
76+
long res = evenTime - timezone.getOffset(evenTime) + TimeZone.getDefault().getOffset(evenTime);
77+
return res;
78+
}
6979
}

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
5454

5555
int maxOutOrderness = sourceTableInfo.getMaxOutOrderness();
5656

57+
String timeZone=sourceTableInfo.getTimeZone();
58+
5759
String[] fieldNames = typeInfo.getFieldNames();
5860
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
5961

@@ -75,9 +77,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
7577

7678
AbsCustomerWaterMarker waterMarker = null;
7779
if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.sql.Timestamp")){
78-
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos);
80+
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos,timeZone);
7981
}else if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.lang.Long")){
80-
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos);
82+
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos,timeZone);
8183
}else{
8284
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");
8385
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4848
kafka09SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
4949
kafka09SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
5050
kafka09SourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
51+
kafka09SourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5152
kafka09SourceTableInfo.check();
5253
return kafka09SourceTableInfo;
5354
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4848
kafka10SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
4949
kafka10SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
5050
kafka10SourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
51+
kafka10SourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5152
kafka10SourceTableInfo.check();
5253
return kafka10SourceTableInfo;
5354
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4848
kafka11SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
4949
kafka11SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
5050
kafka11SourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
51+
kafka11SourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5152
kafka11SourceTableInfo.check();
5253
return kafka11SourceTableInfo;
5354
}

0 commit comments

Comments
 (0)