Skip to content

Commit 4549ec6

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 30ed946 + 4bdb8ab commit 4549ec6

3 files changed

Lines changed: 72 additions & 9 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -162,23 +162,32 @@ private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelec
162162
SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition());
163163

164164
for (int index = 0; index < selectList.size(); index++) {
165-
if (selectList.get(index).getKind().equals(SqlKind.AS)
166-
|| (selectList.get(index).getClass().equals(SqlIdentifier.class)
167-
&& ((SqlIdentifier) selectList.get(index)).names.size() == 1)) {
168-
sqlNodes.add(selectList.get(index));
165+
SqlNode sqlNode = selectList.get(index);
166+
// 判断sqlNode的类型是否属于 't1.f1 as f2'
167+
boolean isAsNode = sqlNode.getKind().equals(SqlKind.AS);
168+
169+
// 判断sqlNode的结构是否属于'f1' 或者 't.*'
170+
boolean isIdentifierOrStar = sqlNode.getClass().equals(SqlIdentifier.class)
171+
// sqlNode like 'f1'
172+
&& (((SqlIdentifier) sqlNode).names.size() == 1
173+
// sqlNode like 't.*'
174+
|| StringUtils.isBlank(((SqlIdentifier) sqlNode).names.get(1)));
175+
176+
if (isAsNode || isIdentifierOrStar) {
177+
sqlNodes.add(sqlNode);
169178
continue;
170179
}
171180

172-
if (!selectList.get(index).getClass().equals(SqlIdentifier.class)) {
173-
if (selectList.get(index).getKind().equals(SqlKind.LITERAL)) {
181+
if (!sqlNode.getClass().equals(SqlIdentifier.class)) {
182+
if (sqlNode.getKind().equals(SqlKind.LITERAL)) {
174183
throw new IllegalArgumentException(String.format("Constants %s in the SELECT statement must be aliased!",
175-
selectList.get(index).toString()));
184+
sqlNode.toString()));
176185
}
177186
throw new RuntimeException(String.format("Illegal statement! Please check the statement: %s",
178-
selectList.get(index).toString()));
187+
sqlNode.toString()));
179188
}
180189

181-
sqlNodes.add(transformToAsNode(selectList.get(index)));
190+
sqlNodes.add(transformToAsNode(sqlNode));
182191
}
183192
sqlSelect.setSelectList(sqlNodes);
184193
}

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,33 @@ public HbaseAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldIn
4444
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4545
}
4646

47+
@Override
48+
public void parseSelectFields(JoinInfo joinInfo) {
49+
String sideTableName = joinInfo.getSideTableName();
50+
String nonSideTableName = joinInfo.getNonSideTable();
51+
List<String> fields = Lists.newArrayList();
52+
int sideTableFieldIndex = 0;
53+
54+
for( int i=0; i<outFieldInfoList.size(); i++){
55+
FieldInfo fieldInfo = outFieldInfoList.get(i);
56+
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
57+
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
58+
fields.add(sideFieldName);
59+
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
60+
sideFieldIndex.put(i, sideTableFieldIndex);
61+
sideFieldNameIndex.put(i, sideFieldName);
62+
sideTableFieldIndex++;
63+
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
64+
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
65+
inFieldIndex.put(i, nonSideIndex);
66+
}else{
67+
throw new RuntimeException("unknown table " + fieldInfo.getTable());
68+
}
69+
}
70+
71+
sideSelectFields = String.join(",", fields);
72+
}
73+
4774
@Override
4875
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
4976
rowKeyBuilder = new RowKeyBuilder();

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncSideInfo.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,33 @@ public HbaseAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Field
3333
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3434
}
3535

36+
@Override
37+
public void parseSelectFields(JoinInfo joinInfo) {
38+
String sideTableName = joinInfo.getSideTableName();
39+
String nonSideTableName = joinInfo.getNonSideTable();
40+
List<String> fields = Lists.newArrayList();
41+
int sideTableFieldIndex = 0;
42+
43+
for( int i=0; i<outFieldInfoList.size(); i++){
44+
FieldInfo fieldInfo = outFieldInfoList.get(i);
45+
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
46+
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
47+
fields.add(sideFieldName);
48+
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
49+
sideFieldIndex.put(i, sideTableFieldIndex);
50+
sideFieldNameIndex.put(i, sideFieldName);
51+
sideTableFieldIndex++;
52+
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
53+
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
54+
inFieldIndex.put(i, nonSideIndex);
55+
}else{
56+
throw new RuntimeException("unknown table " + fieldInfo.getTable());
57+
}
58+
}
59+
60+
sideSelectFields = String.join(",", fields);
61+
}
62+
3663
@Override
3764
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
3865
rowKeyBuilder = new RowKeyBuilder();

0 commit comments

Comments
 (0)