Skip to content

Commit 016f2e9

Browse files
author
xuchao
committed
修改1.10 连续join维表问题
1 parent 622b4b8 commit 016f2e9

8 files changed

Lines changed: 51 additions & 26 deletions

File tree

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
9191
if (leftTableName.equalsIgnoreCase(sideTableName)) {
9292
equalFieldList.add(leftField);
9393
int equalFieldIndex = -1;
94-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
95-
String fieldName = rowTypeInfo.getFieldNames()[i];
94+
for (int i = 0; i < getFieldNames().length; i++) {
95+
String fieldName = getFieldNames()[i];
9696
if (fieldName.equalsIgnoreCase(rightField)) {
9797
equalFieldIndex = i;
9898
}
@@ -107,8 +107,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
107107

108108
equalFieldList.add(rightField);
109109
int equalFieldIndex = -1;
110-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
111-
String fieldName = rowTypeInfo.getFieldNames()[i];
110+
for (int i = 0; i < getFieldNames().length; i++) {
111+
String fieldName = getFieldNames()[i];
112112
if (fieldName.equalsIgnoreCase(leftField)) {
113113
equalFieldIndex = i;
114114
}

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
139139
if(leftTableName.equalsIgnoreCase(sideTableName)){
140140
equalFieldList.add(leftField);
141141
int equalFieldIndex = -1;
142-
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
143-
String fieldName = rowTypeInfo.getFieldNames()[i];
142+
for(int i=0; i<getFieldNames().length; i++){
143+
String fieldName = getFieldNames()[i];
144144
if(fieldName.equalsIgnoreCase(rightField)){
145145
equalFieldIndex = i;
146146
}
@@ -155,8 +155,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
155155

156156
equalFieldList.add(rightField);
157157
int equalFieldIndex = -1;
158-
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
159-
String fieldName = rowTypeInfo.getFieldNames()[i];
158+
for(int i=0; i<getFieldNames().length; i++){
159+
String fieldName = getFieldNames()[i];
160160
if(fieldName.equalsIgnoreCase(leftField)){
161161
equalFieldIndex = i;
162162
}
@@ -281,4 +281,18 @@ public void setSideSelectFieldsType(Map<Integer, String> sideSelectFieldsType) {
281281
public String getSelectSideFieldType(int index){
282282
return sideSelectFieldsType.get(index);
283283
}
284+
285+
public String[] getFieldNames(){
286+
287+
int fieldTypeLength = rowTypeInfo.getFieldTypes().length;
288+
if( fieldTypeLength == 2
289+
&& rowTypeInfo.getFieldTypes()[1].getClass().equals(RowTypeInfo.class)){
290+
return ((RowTypeInfo)rowTypeInfo.getFieldTypes()[1]).getFieldNames();
291+
} else if(fieldTypeLength ==1
292+
&& rowTypeInfo.getFieldTypes()[0].getClass().equals(RowTypeInfo.class)){
293+
return ((RowTypeInfo)rowTypeInfo.getFieldTypes()[0]).getFieldNames();
294+
}else {
295+
return rowTypeInfo.getFieldNames();
296+
}
297+
}
284298
}

core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,18 @@ private static List<FieldInfo> getAllField(JoinScope scope){
120120
}
121121

122122
resolved = (JoinScope.ScopeChild)prefixId.next();
123-
field = resolved.getRowTypeInfo();
123+
int fieldTypeLength = resolved.getRowTypeInfo().getFieldTypes().length;
124+
if(fieldTypeLength == 2
125+
&& resolved.getRowTypeInfo().getFieldTypes()[1].getClass().equals(RowTypeInfo.class)){
126+
field = (RowTypeInfo) resolved.getRowTypeInfo().getFieldTypes()[1];
127+
} else if(fieldTypeLength ==1
128+
&& resolved.getRowTypeInfo().getFieldTypes()[0].getClass().equals(RowTypeInfo.class)){
129+
field = (RowTypeInfo) resolved.getRowTypeInfo().getFieldTypes()[0];
130+
}else{
131+
field = resolved.getRowTypeInfo();
132+
}
133+
134+
124135
String[] fieldNames = field.getFieldNames();
125136
TypeInformation<?>[] types = field.getFieldTypes();
126137
for(int i=0; i< field.getTotalFields(); i++){

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ private void joinFun(Object pollObj,
426426

427427
RowTypeInfo sideOutTypeInfo = buildOutRowTypeInfo(sideJoinFieldInfo, mappingTable);
428428

429-
TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(Types.BOOLEAN, sideOutTypeInfo);
429+
TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(sideOutTypeInfo);
430430
dsOut.getTransformation().setOutputType(tupleTypeInfo);
431431

432432
String targetTableName = joinInfo.getNewTableName();

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncSideInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
7575
if (leftTableName.equalsIgnoreCase(sideTableName)) {
7676
equalFieldList.add(leftField);
7777
int equalFieldIndex = -1;
78-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
79-
String fieldName = rowTypeInfo.getFieldNames()[i];
78+
for (int i = 0; i < getFieldNames().length; i++) {
79+
String fieldName = getFieldNames()[i];
8080
if (fieldName.equalsIgnoreCase(rightField)) {
8181
equalFieldIndex = i;
8282
}
@@ -91,8 +91,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
9191

9292
equalFieldList.add(rightField);
9393
int equalFieldIndex = -1;
94-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
95-
String fieldName = rowTypeInfo.getFieldNames()[i];
94+
for (int i = 0; i < getFieldNames().length; i++) {
95+
String fieldName = getFieldNames()[i];
9696
if (fieldName.equalsIgnoreCase(leftField)) {
9797
equalFieldIndex = i;
9898
}

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncSideInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
6060
if (leftTableName.equalsIgnoreCase(sideTableName)) {
6161
equalFieldList.add(leftField);
6262
int equalFieldIndex = -1;
63-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
64-
String fieldName = rowTypeInfo.getFieldNames()[i];
63+
for (int i = 0; i < getFieldNames().length; i++) {
64+
String fieldName = getFieldNames()[i];
6565
if (fieldName.equalsIgnoreCase(rightField)) {
6666
equalFieldIndex = i;
6767
}
@@ -76,8 +76,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
7676

7777
equalFieldList.add(rightField);
7878
int equalFieldIndex = -1;
79-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
80-
String fieldName = rowTypeInfo.getFieldNames()[i];
79+
for (int i = 0; i < getFieldNames().length; i++) {
80+
String fieldName = getFieldNames()[i];
8181
if (fieldName.equalsIgnoreCase(leftField)) {
8282
equalFieldIndex = i;
8383
}

mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncSideInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
7979
if(leftTableName.equalsIgnoreCase(sideTableName)){
8080
equalFieldList.add(leftField);
8181
int equalFieldIndex = -1;
82-
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
83-
String fieldName = rowTypeInfo.getFieldNames()[i];
82+
for(int i=0; i<getFieldNames().length; i++){
83+
String fieldName = getFieldNames()[i];
8484
if(fieldName.equalsIgnoreCase(rightField)){
8585
equalFieldIndex = i;
8686
}
@@ -95,8 +95,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
9595

9696
equalFieldList.add(rightField);
9797
int equalFieldIndex = -1;
98-
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
99-
String fieldName = rowTypeInfo.getFieldNames()[i];
98+
for(int i=0; i<getFieldNames().length; i++){
99+
String fieldName = getFieldNames()[i];
100100
if(fieldName.equalsIgnoreCase(leftField)){
101101
equalFieldIndex = i;
102102
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
101101
if (leftTableName.equalsIgnoreCase(sideTableName)) {
102102
equalFieldList.add(leftField);
103103
int equalFieldIndex = -1;
104-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
105-
String fieldName = rowTypeInfo.getFieldNames()[i];
104+
for (int i = 0; i < getFieldNames().length; i++) {
105+
String fieldName = getFieldNames()[i];
106106
if (fieldName.equalsIgnoreCase(rightField)) {
107107
equalFieldIndex = i;
108108
}
@@ -117,8 +117,8 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
117117

118118
equalFieldList.add(rightField);
119119
int equalFieldIndex = -1;
120-
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
121-
String fieldName = rowTypeInfo.getFieldNames()[i];
120+
for (int i = 0; i < getFieldNames().length; i++) {
121+
String fieldName = getFieldNames()[i];
122122
if (fieldName.equalsIgnoreCase(leftField)) {
123123
equalFieldIndex = i;
124124
}

0 commit comments

Comments
 (0)