Skip to content

Commit f3fae83

Browse files
committed
Merge branch 'v1.5.0_dev_watermarkTimezone' into 'v1.5.0_dev'
V1.5.0 dev watermark timezone 1.添加git-branch 2.eventtime调整时区 See merge request !10
2 parents f66e56e + f8b2798 commit f3fae83

35 files changed

Lines changed: 110 additions & 43 deletions

File tree

cassandra/cassandra-side/cassandra-all-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
</copy>
7777

7878
<move file="${basedir}/../../../plugins/cassandraallside/${project.artifactId}-${project.version}.jar"
79-
tofile="${basedir}/../../../plugins/cassandraallside/${project.name}.jar" />
79+
tofile="${basedir}/../../../plugins/cassandraallside/${project.name}-${git.branch}.jar" />
8080
</tasks>
8181
</configuration>
8282
</execution>

cassandra/cassandra-side/cassandra-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
</copy>
9393

9494
<move file="${basedir}/../../../plugins/cassandraasyncside/${project.artifactId}-${project.version}.jar"
95-
tofile="${basedir}/../../../plugins/cassandraasyncside/${project.name}.jar" />
95+
tofile="${basedir}/../../../plugins/cassandraasyncside/${project.name}-${git.branch}.jar" />
9696
</tasks>
9797
</configuration>
9898
</execution>

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
</fileset>
107107
</copy>
108108
<move file="${basedir}/../plugins/${project.artifactId}-${project.version}.jar"
109-
tofile="${basedir}/../plugins/${project.name}.jar" />
109+
tofile="${basedir}/../plugins/${project.name}-${git.branch}.jar" />
110110
</tasks>
111111
</configuration>
112112
</execution>

core/src/main/java/com/dtstack/flink/sql/table/SourceTableInfo.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
24+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2425
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
26+
import org.apache.flink.util.StringUtils;
2527

28+
import java.util.ArrayList;
2629
import java.util.Map;
30+
import java.util.TimeZone;
2731

2832
/**
2933
* Reason:
@@ -36,6 +40,10 @@ public abstract class SourceTableInfo extends TableInfo {
3640

3741
public static final String SOURCE_SUFFIX = "Source";
3842

43+
public static final String TIME_ZONE_KEY="timezone";
44+
45+
private String timeZone=TimeZone.getDefault().getID();
46+
3947
private String eventTimeField;
4048

4149
private Integer maxOutOrderness = 10;
@@ -63,7 +71,6 @@ public void setMaxOutOrderness(Integer maxOutOrderness) {
6371
if(maxOutOrderness == null){
6472
return;
6573
}
66-
6774
this.maxOutOrderness = maxOutOrderness;
6875
}
6976

@@ -101,4 +108,23 @@ public String getAdaptSelectSql(){
101108
public String getAdaptName(){
102109
return getName() + "_adapt";
103110
}
111+
112+
public String getTimeZone() {
113+
return timeZone;
114+
}
115+
116+
public void setTimeZone(String timeZone) {
117+
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
118+
return;
119+
}
120+
timeZoneCheck(timeZone);
121+
this.timeZone = timeZone;
122+
}
123+
124+
private void timeZoneCheck(String timeZone) {
125+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
126+
if (!zones.contains(timeZone)){
127+
throw new IllegalArgumentException(" timezone is Incorrect!");
128+
}
129+
}
104130
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.watermarker;
2222

2323
import com.dtstack.flink.sql.util.MathUtil;
24-
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
2524
import org.apache.flink.streaming.api.windowing.time.Time;
2625
import org.apache.flink.types.Row;
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
2928

29+
import java.util.TimeZone;
30+
3031
/**
3132
* Custom watermark --- for eventtime
3233
* Date: 2017/12/28
@@ -44,23 +45,29 @@ public class CustomerWaterMarkerForLong extends AbsCustomerWaterMarker<Row> {
4445

4546
private long lastTime = 0;
4647

47-
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos) {
48+
private TimeZone timezone;
49+
50+
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos,String timezone) {
4851
super(maxOutOfOrderness);
4952
this.pos = pos;
53+
this.timezone= TimeZone.getTimeZone(timezone);
5054
}
5155

5256
@Override
5357
public long extractTimestamp(Row row) {
5458

5559
try{
56-
Long eveTime = MathUtil.getLongVal(row.getField(pos));
57-
lastTime = eveTime;
58-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - eveTime)/1000));
59-
return eveTime;
60+
Long extractTime = MathUtil.getLongVal(row.getField(pos));
61+
62+
lastTime = extractTime + timezone.getOffset(extractTime);
63+
64+
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
65+
66+
return lastTime;
6067
}catch (Exception e){
6168
logger.error("", e);
6269
}
63-
6470
return lastTime;
6571
}
72+
6673
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.sql.Timestamp;
30+
import java.util.TimeZone;
3031

3132
/**
3233
* Custom watermark --- for eventtime
@@ -45,25 +46,30 @@ public class CustomerWaterMarkerForTimeStamp extends AbsCustomerWaterMarker<Row>
4546

4647
private long lastTime = 0;
4748

49+
private TimeZone timezone;
4850

49-
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos) {
51+
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
5052
super(maxOutOfOrderness);
5153
this.pos = pos;
54+
this.timezone= TimeZone.getTimeZone(timezone);
5255
}
5356

5457
@Override
5558
public long extractTimestamp(Row row) {
5659
try {
5760
Timestamp time = (Timestamp) row.getField(pos);
58-
lastTime = time.getTime();
5961

60-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - time.getTime())/1000));
61-
return time.getTime();
62+
long extractTime=time.getTime();
63+
64+
lastTime = extractTime + timezone.getOffset(extractTime);
65+
66+
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
67+
68+
return lastTime;
6269
} catch (RuntimeException e) {
6370
logger.error("", e);
6471
}
6572
return lastTime;
6673
}
6774

68-
6975
}

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
5454

5555
int maxOutOrderness = sourceTableInfo.getMaxOutOrderness();
5656

57+
String timeZone=sourceTableInfo.getTimeZone();
58+
5759
String[] fieldNames = typeInfo.getFieldNames();
5860
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
5961

@@ -75,9 +77,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
7577

7678
AbsCustomerWaterMarker waterMarker = null;
7779
if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.sql.Timestamp")){
78-
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos);
80+
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos,timeZone);
7981
}else if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.lang.Long")){
80-
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos);
82+
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos,timeZone);
8183
}else{
8284
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");
8385
}

elasticsearch5/elasticsearch5-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
</copy>
8686

8787
<move file="${basedir}/../../plugins/elasticsearchsink/${project.artifactId}-${project.version}.jar"
88-
tofile="${basedir}/../../plugins/elasticsearchsink/${project.name}.jar" />
88+
tofile="${basedir}/../../plugins/elasticsearchsink/${project.name}-${git.branch}.jar" />
8989
</tasks>
9090
</configuration>
9191
</execution>

hbase/hbase-side/hbase-all-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393
</copy>
9494

9595
<move file="${basedir}/../../../plugins/hbaseallside/${project.artifactId}-${project.version}.jar"
96-
tofile="${basedir}/../../../plugins/hbaseallside/${project.name}.jar" />
96+
tofile="${basedir}/../../../plugins/hbaseallside/${project.name}-${git.branch}.jar" />
9797
</tasks>
9898
</configuration>
9999
</execution>

hbase/hbase-side/hbase-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
</copy>
9191

9292
<move file="${basedir}/../../../plugins/hbaseasyncside/${project.artifactId}-${project.version}.jar"
93-
tofile="${basedir}/../../../plugins/hbaseasyncside/${project.name}.jar" />
93+
tofile="${basedir}/../../../plugins/hbaseasyncside/${project.name}-${git.branch}.jar" />
9494
</tasks>
9595
</configuration>
9696
</execution>

0 commit comments

Comments
 (0)