Skip to content

Commit 4fd1c06

Browse files
committed
Merge branch '1.10_test_4.1.x' into feat_1.10_impalaSinkKerberos_mergedTest
2 parents 0083555 + c40705b commit 4fd1c06

57 files changed

Lines changed: 1450 additions & 540 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,18 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import com.datastax.driver.core.*;
21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.types.Row;
23+
import org.apache.flink.util.Collector;
24+
25+
import com.datastax.driver.core.Cluster;
26+
import com.datastax.driver.core.ConsistencyLevel;
27+
import com.datastax.driver.core.HostDistance;
28+
import com.datastax.driver.core.PoolingOptions;
29+
import com.datastax.driver.core.QueryOptions;
30+
import com.datastax.driver.core.ResultSet;
31+
import com.datastax.driver.core.Session;
32+
import com.datastax.driver.core.SocketOptions;
2233
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
2334
import com.datastax.driver.core.policies.RetryPolicy;
2435
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -32,10 +43,7 @@
3243
import org.apache.calcite.sql.JoinType;
3344
import org.apache.commons.collections.CollectionUtils;
3445
import org.apache.commons.lang3.StringUtils;
35-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3646
import org.apache.flink.table.dataformat.BaseRow;
37-
import org.apache.flink.types.Row;
38-
import org.apache.flink.util.Collector;
3947
import org.slf4j.Logger;
4048
import org.slf4j.LoggerFactory;
4149

@@ -72,27 +80,6 @@ public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Field
7280
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7381
}
7482

75-
@Override
76-
public Row fillData(Row input, Object sideInput) {
77-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
78-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
79-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
80-
Object obj = input.getField(entry.getValue());
81-
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
82-
row.setField(entry.getKey(), obj);
83-
}
84-
85-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
86-
if (cacheInfo == null) {
87-
row.setField(entry.getKey(), null);
88-
} else {
89-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
90-
}
91-
}
92-
93-
return row;
94-
}
95-
9683
@Override
9784
protected void initCache() throws SQLException {
9885
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -72,57 +72,4 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
7272
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
7373
}
7474

75-
76-
@Override
77-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
78-
if (sqlNode.getKind() != SqlKind.EQUALS) {
79-
throw new RuntimeException("not equal operator.");
80-
}
81-
82-
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];
83-
SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1];
84-
85-
String leftTableName = left.getComponent(0).getSimple();
86-
String leftField = left.getComponent(1).getSimple();
87-
88-
String rightTableName = right.getComponent(0).getSimple();
89-
String rightField = right.getComponent(1).getSimple();
90-
91-
if (leftTableName.equalsIgnoreCase(sideTableName)) {
92-
equalFieldList.add(leftField);
93-
int equalFieldIndex = -1;
94-
for (int i = 0; i < getFieldNames().length; i++) {
95-
String fieldName = getFieldNames()[i];
96-
if (fieldName.equalsIgnoreCase(rightField)) {
97-
equalFieldIndex = i;
98-
}
99-
}
100-
if (equalFieldIndex == -1) {
101-
throw new RuntimeException("can't deal equal field: " + sqlNode);
102-
}
103-
104-
equalValIndex.add(equalFieldIndex);
105-
106-
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
107-
108-
equalFieldList.add(rightField);
109-
int equalFieldIndex = -1;
110-
for (int i = 0; i < getFieldNames().length; i++) {
111-
String fieldName = getFieldNames()[i];
112-
if (fieldName.equalsIgnoreCase(leftField)) {
113-
equalFieldIndex = i;
114-
}
115-
}
116-
if (equalFieldIndex == -1) {
117-
throw new RuntimeException("can't deal equal field: " + sqlNode.toString());
118-
}
119-
120-
equalValIndex.add(equalFieldIndex);
121-
122-
} else {
123-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
124-
}
125-
126-
}
127-
12875
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.constant;
20+
21+
/**
22+
* @program: flinkStreamSQL
23+
* @author: wuren
24+
* @create: 2020/09/15
25+
**/
26+
public class PluginParamConsts {
27+
public static final String PRINCIPAL = "principal";
28+
public static final String KEYTAB = "keytab";
29+
public static final String KRB5_CONF = "krb5conf";
30+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.krb;
20+
21+
import com.google.common.base.Strings;
22+
23+
/**
24+
* @program: flinkStreamSQL
25+
* @author: wuren
26+
* @create: 2020/09/15
27+
**/
28+
public interface KerberosTable {
29+
30+
String getPrincipal();
31+
32+
void setPrincipal(String principal);
33+
34+
String getKeytab();
35+
36+
void setKeytab(String keytab);
37+
38+
String getKrb5conf();
39+
40+
void setKrb5conf(String krb5conf);
41+
42+
boolean isEnableKrb();
43+
44+
void setEnableKrb(boolean enableKrb);
45+
46+
default void judgeKrbEnable() {
47+
boolean allSet =
48+
!Strings.isNullOrEmpty(getPrincipal()) &&
49+
!Strings.isNullOrEmpty(getKeytab()) &&
50+
!Strings.isNullOrEmpty(getKrb5conf());
51+
52+
boolean allNotSet =
53+
Strings.isNullOrEmpty(getPrincipal()) &&
54+
Strings.isNullOrEmpty(getKeytab()) &&
55+
Strings.isNullOrEmpty(getKrb5conf());
56+
57+
if (allSet) {
58+
setEnableKrb(true);
59+
} else if (allNotSet) {
60+
setEnableKrb(false);
61+
} else {
62+
throw new RuntimeException("Missing kerberos parameter! all kerberos params must be set, or all kerberos params are not set");
63+
}
64+
}
65+
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.util.RowDataComplete;
2424
import org.apache.calcite.sql.JoinType;
2525
import org.apache.flink.api.common.functions.RichFlatMapFunction;
26+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.table.dataformat.BaseRow;
2829
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -34,6 +35,8 @@
3435
import java.sql.SQLException;
3536
import java.sql.Timestamp;
3637
import java.time.LocalDateTime;
38+
import java.util.Map;
39+
import java.util.TimeZone;
3740
import java.util.concurrent.ScheduledExecutorService;
3841
import java.util.concurrent.ScheduledThreadPoolExecutor;
3942
import java.util.concurrent.TimeUnit;
@@ -42,7 +45,6 @@
4245
* Reason:
4346
* Date: 2018/9/18
4447
* Company: www.dtstack.com
45-
*
4648
* @author xuchao
4749
*/
4850

@@ -52,6 +54,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<Row, BaseRow> im
5254

5355
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
5456

57+
public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
58+
5559
protected BaseSideInfo sideInfo;
5660

5761
private ScheduledExecutorService es;
@@ -95,6 +99,45 @@ protected void sendOutputRow(Row value, Object sideInput, Collector<BaseRow> out
9599
RowDataComplete.collectRow(out, row);
96100
}
97101

102+
@Override
103+
public Row fillData(Row input, Object sideInput) {
104+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
105+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
106+
107+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
108+
// origin value
109+
Object obj = input.getField(entry.getValue());
110+
obj = dealTimeAttributeType(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass(), obj);
111+
row.setField(entry.getKey(), obj);
112+
}
113+
114+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
115+
if (cacheInfo == null) {
116+
row.setField(entry.getKey(), null);
117+
} else {
118+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
119+
}
120+
}
121+
return row;
122+
}
123+
124+
/**
125+
* covert flink time attribute.Type information for indicating event or processing time.
126+
* However, it behaves like a regular SQL timestamp but is serialized as Long.
127+
*
128+
* @param entry
129+
* @param obj
130+
* @return
131+
*/
132+
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
133+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
134+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
135+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
136+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
137+
}
138+
return obj;
139+
}
140+
98141
@Override
99142
public void close() throws Exception {
100143
if (null != es && !es.isShutdown()) {

0 commit comments

Comments
 (0)