Skip to content

Commit 6447744

Browse files
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 499d2ab + 2065fa6 commit 6447744

12 files changed

Lines changed: 104 additions & 10 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.time.LocalDateTime;
3838
import java.util.Map;
3939
import java.util.TimeZone;
40+
import java.util.TimeZone;
4041
import java.util.concurrent.ScheduledExecutorService;
4142
import java.util.concurrent.ScheduledThreadPoolExecutor;
4243
import java.util.concurrent.TimeUnit;
@@ -45,6 +46,7 @@
4546
* Reason:
4647
* Date: 2018/9/18
4748
* Company: www.dtstack.com
49+
*
4850
* @author xuchao
4951
*/
5052

@@ -86,7 +88,8 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
8688

8789
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
8890
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
89-
obj = Timestamp.valueOf(((LocalDateTime) obj));
91+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
92+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
9093
}
9194
return obj;
9295
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Collections;
5353
import java.util.List;
5454
import java.util.Map;
55+
import java.util.TimeZone;
5556
import java.util.concurrent.ScheduledFuture;
5657

5758
/**
@@ -71,6 +72,7 @@ public abstract class BaseAsyncReqRow extends RichAsyncFunction<Row, BaseRow> im
7172
private int timeOutNum = 0;
7273
protected BaseSideInfo sideInfo;
7374
protected transient Counter parseErrorRecords;
75+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
7476

7577
public BaseAsyncReqRow(BaseSideInfo sideInfo) {
7678
this.sideInfo = sideInfo;
@@ -117,7 +119,8 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
117119

118120
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
119121
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
120-
obj = Timestamp.valueOf(((LocalDateTime) obj));
122+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
123+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
121124
}
122125
return obj;
123126
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractSourceTableInfo.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.table;
2222

2323
import com.google.common.base.Strings;
24+
import com.google.common.collect.Lists;
2425
import com.google.common.collect.Maps;
26+
import org.apache.flink.util.StringUtils;
2527

28+
import java.util.ArrayList;
2629
import java.util.Map;
30+
import java.util.TimeZone;
2731

2832
/**
2933
* Reason:
@@ -36,6 +40,10 @@ public abstract class AbstractSourceTableInfo extends AbstractTableInfo {
3640

3741
public static final String SOURCE_SUFFIX = "Source";
3842

43+
public static final String TIME_ZONE_KEY = "timezone";
44+
45+
private String timeZone = TimeZone.getDefault().getID();
46+
3947
private String eventTimeField;
4048

4149
private Integer maxOutOrderness = 10;
@@ -101,4 +109,23 @@ public String getAdaptSelectSql(){
101109
public String getAdaptName(){
102110
return getName() + "_adapt";
103111
}
112+
113+
public String getTimeZone() {
114+
return timeZone;
115+
}
116+
117+
public void setTimeZone(String timeZone) {
118+
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
119+
return;
120+
}
121+
timeZoneCheck(timeZone);
122+
this.timeZone = timeZone;
123+
}
124+
125+
private void timeZoneCheck(String timeZone) {
126+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
127+
if (!zones.contains(timeZone)){
128+
throw new IllegalArgumentException(" timezone is Incorrect!");
129+
}
130+
}
104131
}

core/src/main/java/com/dtstack/flink/sql/watermarker/AbstractCustomerWaterMarker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
3030
import org.apache.flink.streaming.api.windowing.time.Time;
3131

32+
import java.util.TimeZone;
33+
3234
/**
3335
* Reason:
3436
* Date: 2018/10/18
@@ -51,6 +53,8 @@ public abstract class AbstractCustomerWaterMarker<T> extends BoundedOutOfOrderne
5153

5254
protected long lastTime = 0;
5355

56+
protected TimeZone timezone;
57+
5458
public AbstractCustomerWaterMarker(Time maxOutOfOrderness) {
5559
super(maxOutOfOrderness);
5660
}
@@ -98,7 +102,8 @@ public void setFromSourceTag(String fromSourceTag) {
98102

99103
protected long getExtractTimestamp(Long extractTime){
100104

101-
lastTime = extractTime;
105+
lastTime = extractTime + timezone.getOffset(extractTime);
106+
102107
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
103108

104109
return lastTime;

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29+
import java.util.TimeZone;
30+
2931
/**
3032
* Custom watermark --- for eventtime
3133
* Date: 2017/12/28
@@ -37,9 +39,10 @@ public class CustomerWaterMarkerForLong extends AbstractCustomerWaterMarker<Row>
3739

3840
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForLong.class);
3941

40-
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos) {
42+
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos, String timezone) {
4143
super(maxOutOfOrderness);
4244
this.pos = pos;
45+
this.timezone= TimeZone.getTimeZone(timezone);
4346
}
4447

4548
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.watermarker;
2222

@@ -39,9 +39,10 @@ public class CustomerWaterMarkerForTimeStamp extends AbstractCustomerWaterMarker
3939

4040
private static final Logger logger = LoggerFactory.getLogger(CustomerWaterMarkerForTimeStamp.class);
4141

42-
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos) {
42+
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
4343
super(maxOutOfOrderness);
4444
this.pos = pos;
45+
this.timezone= TimeZone.getTimeZone(timezone);
4546
}
4647

4748
@Override

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
4848

4949
int maxOutOrderness = sourceTableInfo.getMaxOutOrderness();
5050

51+
String timeZone = sourceTableInfo.getTimeZone();
52+
5153
String[] fieldNames = typeInfo.getFieldNames();
5254
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
5355

@@ -69,9 +71,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
6971

7072
AbstractCustomerWaterMarker waterMarker = null;
7173
if(fieldType.getTypeClass().isAssignableFrom(Timestamp.class)){
72-
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos);
74+
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos, timeZone);
7375
}else if(fieldType.getTypeClass().isAssignableFrom(Long.class)){
74-
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos);
76+
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos, timeZone);
7577
}else{
7678
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");
7779
}

elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.commons.lang3.StringUtils;
3131
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3232
import org.apache.flink.table.dataformat.BaseRow;
33+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3334
import org.apache.flink.types.Row;
3435
import org.apache.flink.util.Collector;
3536
import org.elasticsearch.action.search.SearchRequest;
@@ -45,6 +46,7 @@
4546

4647
import java.io.IOException;
4748
import java.io.Serializable;
49+
import java.sql.Timestamp;
4850
import java.util.Calendar;
4951
import java.util.List;
5052
import java.util.Map;
@@ -93,6 +95,34 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
9395
}
9496
}
9597

98+
@Override
99+
public Row fillData(Row input, Object sideInput) {
100+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
101+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
102+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
103+
Object obj = input.getField(entry.getValue());
104+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
105+
106+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
107+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
108+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
109+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
110+
}
111+
112+
row.setField(entry.getKey(), obj);
113+
}
114+
115+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
116+
if (cacheInfo == null) {
117+
row.setField(entry.getKey(), null);
118+
} else {
119+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
120+
}
121+
}
122+
123+
return row;
124+
}
125+
96126
private String buildKey(List<Object> equalValList) {
97127
StringBuilder sb = new StringBuilder("");
98128
for (Object equalVal : equalValList) {

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ public Row fillData(Row input, Object sideInput) {
9999

100100
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
101101
if (obj instanceof LocalDateTime && isTimeIndicatorTypeInfo) {
102-
obj = Timestamp.valueOf(((LocalDateTime) obj));
102+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
103+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
103104
}
104105

105106
row.setField(entry.getKey(), obj);

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5656
kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
5757
kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));
5858
kafkaSourceTableInfo.setSourceDataType(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.SOURCE_DATA_TYPE_KEY.toLowerCase(), FormatType.DT_NEST.name())));
59+
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
5960

6061
if(props.containsKey(KafkaSourceTableInfo.TIMESTAMP_OFFSET.toLowerCase())){
6162
kafkaSourceTableInfo.setTimestampOffset(MathUtil.getLongVal(props.getOrDefault(KafkaSourceTableInfo.TIMESTAMP_OFFSET.toLowerCase(), System.currentTimeMillis())));

0 commit comments

Comments
 (0)