Skip to content

Commit a314faa

Browse files
committed
add join constant support
1 parent 5a8422e commit a314faa

3 files changed

Lines changed: 103 additions & 36 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import org.apache.flink.api.common.functions.RichFlatMapFunction;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2425
import org.apache.flink.configuration.Configuration;
2526
import org.apache.flink.table.runtime.types.CRow;
27+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2628
import org.apache.flink.types.Row;
2729
import org.apache.flink.util.Collector;
2830
import org.slf4j.Logger;
@@ -32,6 +34,8 @@
3234
import org.apache.calcite.sql.JoinType;
3335

3436
import java.sql.SQLException;
37+
import java.sql.Timestamp;
38+
import java.util.Map;
3539
import java.util.TimeZone;
3640
import java.util.concurrent.ScheduledExecutorService;
3741
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -86,6 +90,37 @@ protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out)
8690
out.collect(new CRow(row, value.change()));
8791
}
8892

93+
@Override
94+
public Row fillData(Row input, Object sideInput) {
95+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
96+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
97+
98+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
99+
// origin value
100+
Object obj = input.getField(entry.getValue());
101+
obj = dealTimeAttributeType(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass(), obj);
102+
row.setField(entry.getKey(), obj);
103+
}
104+
105+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
106+
if (cacheInfo == null) {
107+
row.setField(entry.getKey(), null);
108+
} else {
109+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
110+
}
111+
}
112+
return row;
113+
}
114+
115+
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
116+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
117+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
118+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
119+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
120+
}
121+
return obj;
122+
}
123+
89124
@Override
90125
public void close() throws Exception {
91126
if (null != es && !es.isShutdown()) {

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ private Map<String, Object> parseInputParam(CRow input){
194194
return inputParams;
195195
}
196196

197+
private void constantField() {
198+
199+
}
200+
197201
protected boolean isUseCache(Map<String, Object> inputParams){
198202
return openCache() && getFromCache(buildCacheKey(inputParams)) != null;
199203
}

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 64 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,17 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
24-
import org.apache.calcite.sql.JoinType;
25-
import org.apache.calcite.sql.SqlBasicCall;
26-
import org.apache.calcite.sql.SqlIdentifier;
27-
import org.apache.calcite.sql.SqlKind;
28-
import org.apache.calcite.sql.SqlNode;
24+
import com.google.common.base.Preconditions;
25+
import com.google.common.collect.Sets;
26+
import org.apache.calcite.sql.*;
2927
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3028
import com.google.common.collect.Lists;
3129
import com.google.common.collect.Maps;
3230

3331
import java.io.Serializable;
3432
import java.util.List;
3533
import java.util.Map;
34+
import java.util.Set;
3635

3736
/**
3837
* Reason:
@@ -126,51 +125,80 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
126125
throw new RuntimeException("not compare operator.");
127126
}
128127

129-
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];
130-
SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1];
128+
SqlNode leftNode = ((SqlBasicCall) sqlNode).getOperands()[0];
129+
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
130+
if (leftNode.getKind() == SqlKind.LITERAL) {
131+
SqlLiteral literal = (SqlLiteral) leftNode;
132+
SqlIdentifier identifier = (SqlIdentifier) rightNode;
133+
evalConstantEquation(literal, identifier);
134+
} else if(rightNode.getKind() == SqlKind.LITERAL) {
135+
SqlLiteral literal = (SqlLiteral) rightNode;
136+
SqlIdentifier identifier = (SqlIdentifier) leftNode;
137+
evalConstantEquation(literal, identifier);
138+
} else {
139+
SqlIdentifier left = (SqlIdentifier) leftNode;
140+
SqlIdentifier right = (SqlIdentifier) rightNode;
141+
evalEquation(left, right, sideTableName, sqlNode);
142+
}
143+
}
131144

145+
/**
146+
* deal normal equation etc. foo.id = bar.id
147+
* @param left
148+
* @param right
149+
* @param sideTableName
150+
* @param sqlNode
151+
*/
152+
private void evalEquation(SqlIdentifier left, SqlIdentifier right, String sideTableName, SqlNode sqlNode) {
132153
String leftTableName = left.getComponent(0).getSimple();
133154
String leftField = left.getComponent(1).getSimple();
134155

135156
String rightTableName = right.getComponent(0).getSimple();
136157
String rightField = right.getComponent(1).getSimple();
137158

138159
if (leftTableName.equalsIgnoreCase(sideTableName)) {
139-
equalFieldList.add(leftField);
140-
int equalFieldIndex = -1;
141-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
142-
String fieldName = rowTypeInfo.getFieldNames()[i];
143-
if (fieldName.equalsIgnoreCase(rightField)) {
144-
equalFieldIndex = i;
145-
}
146-
}
147-
if (equalFieldIndex == -1) {
148-
throw new RuntimeException("can't deal equal field: " + sqlNode);
149-
}
150-
151-
equalValIndex.add(equalFieldIndex);
152-
160+
associateField(rightField, leftField, sqlNode);
153161
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
154-
155-
equalFieldList.add(rightField);
156-
int equalFieldIndex = -1;
157-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
158-
String fieldName = rowTypeInfo.getFieldNames()[i];
159-
if (fieldName.equalsIgnoreCase(leftField)) {
160-
equalFieldIndex = i;
161-
}
162-
}
163-
if (equalFieldIndex == -1) {
164-
throw new RuntimeException("can't deal equal field: " + sqlNode.toString());
165-
}
166-
167-
equalValIndex.add(equalFieldIndex);
168-
162+
associateField(leftField, rightField, sqlNode);
169163
} else {
170164
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
171165
}
172166
}
173167

168+
/**
169+
* deal with equation with constant etc. foo.id = 1
170+
* @param literal
171+
* @param identifier
172+
*/
173+
private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier) {
174+
String tableName = identifier.getComponent(0).getSimple();
175+
String fieldName = identifier.getComponent(1).getSimple();
176+
Object constant = literal.getValue();
177+
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
178+
PredicateInfo predicate = PredicateInfo.builder()
179+
.setOperatorName("=")
180+
.setOperatorKind("EQUALS")
181+
.setOwnerTable(tableName)
182+
.setFieldName(fieldName)
183+
.setCondition(constant.toString())
184+
.build();
185+
predicateInfos.add(predicate);
186+
}
187+
188+
private void associateField(String sourceTableField, String sideTableField, SqlNode sqlNode) {
189+
String errorMsg = "can't deal equal field: " + sqlNode;
190+
equalFieldList.add(sideTableField);
191+
int equalFieldIndex = -1;
192+
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
193+
String fieldName = rowTypeInfo.getFieldNames()[i];
194+
if (fieldName.equalsIgnoreCase(sourceTableField)) {
195+
equalFieldIndex = i;
196+
}
197+
}
198+
Preconditions.checkState(equalFieldIndex != -1, errorMsg);
199+
equalValIndex.add(equalFieldIndex);
200+
}
201+
174202
public abstract void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo);
175203

176204
public RowTypeInfo getRowTypeInfo() {

0 commit comments

Comments
 (0)