2525import com .dtstack .flink .sql .side .cache .CacheObj ;
2626import com .dtstack .flink .sql .side .cache .LRUSideCache ;
2727import org .apache .calcite .sql .JoinType ;
28- import org .apache .calcite .sql .SqlBasicCall ;
29- import org .apache .calcite .sql .SqlIdentifier ;
30- import org .apache .calcite .sql .SqlKind ;
31- import org .apache .calcite .sql .SqlNode ;
32- import org .apache .flink .api .java .typeutils .RowTypeInfo ;
33- import org .apache .flink .calcite .shaded .com .google .common .collect .Lists ;
34- import org .apache .flink .calcite .shaded .com .google .common .collect .Maps ;
3528import org .apache .flink .configuration .Configuration ;
3629import org .apache .flink .streaming .api .functions .async .ResultFuture ;
3730import org .apache .flink .streaming .api .functions .async .RichAsyncFunction ;
3831import org .apache .flink .types .Row ;
39- import org .slf4j .Logger ;
40- import org .slf4j .LoggerFactory ;
4132
4233import java .util .Collections ;
43- import java .util .List ;
44- import java .util .Map ;
4534
4635/**
4736 * All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
5342
5443public abstract class AsyncReqRow extends RichAsyncFunction <Row , Row > {
5544
56- private static final Logger LOG = LoggerFactory .getLogger (AsyncReqRow .class );
57-
5845 private static final long serialVersionUID = 2098635244857937717L ;
5946
60- protected RowTypeInfo rowTypeInfo ;
61-
62- protected List <FieldInfo > outFieldInfoList ;
63-
64- protected List <String > equalFieldList = Lists .newArrayList ();
65-
66- protected List <Integer > equalValIndex = Lists .newArrayList ();
67-
68- protected String sqlCondition = "" ;
69-
70- protected String sideSelectFields = "" ;
71-
72- protected JoinType joinType ;
47+ protected SideInfo sideInfo ;
7348
74- //key:Returns the value of the position, returns the index values in the input data
75- protected Map <Integer , Integer > inFieldIndex = Maps .newHashMap ();
76-
77- protected Map <Integer , Integer > sideFieldIndex = Maps .newHashMap ();
78-
79- protected SideTableInfo sideTableInfo ;
80-
81- protected AbsSideCache sideCache ;
82-
83- public AsyncReqRow (RowTypeInfo rowTypeInfo , JoinInfo joinInfo , List <FieldInfo > outFieldInfoList ,
84- SideTableInfo sideTableInfo ){
85- this .rowTypeInfo = rowTypeInfo ;
86- this .outFieldInfoList = outFieldInfoList ;
87- this .joinType = joinInfo .getJoinType ();
88- this .sideTableInfo = sideTableInfo ;
89- parseSelectFields (joinInfo );
90- buildEqualInfo (joinInfo , sideTableInfo );
49+ public AsyncReqRow (SideInfo sideInfo ){
50+ this .sideInfo = sideInfo ;
9151 }
9252
9353 private void initCache (){
54+ SideTableInfo sideTableInfo = sideInfo .getSideTableInfo ();
9455 if (sideTableInfo .getCacheType () == null || ECacheType .NONE .name ().equalsIgnoreCase (sideTableInfo .getCacheType ())){
9556 return ;
9657 }
9758
59+ AbsSideCache sideCache ;
9860 if (ECacheType .LRU .name ().equalsIgnoreCase (sideTableInfo .getCacheType ())){
9961 sideCache = new LRUSideCache (sideTableInfo );
62+ sideInfo .setSideCache (sideCache );
10063 }else {
10164 throw new RuntimeException ("not support side cache with type:" + sideTableInfo .getCacheType ());
10265 }
@@ -105,101 +68,22 @@ private void initCache(){
10568 }
10669
10770 protected CacheObj getFromCache (String key ){
108- return sideCache .getFromCache (key );
71+ return sideInfo . getSideCache () .getFromCache (key );
10972 }
11073
11174 protected void putCache (String key , CacheObj value ){
112- sideCache .putCache (key , value );
75+ sideInfo . getSideCache () .putCache (key , value );
11376 }
11477
11578 protected boolean openCache (){
116- return sideCache != null ;
79+ return sideInfo . getSideCache () != null ;
11780 }
11881
119- public void parseSelectFields (JoinInfo joinInfo ){
120- String sideTableName = joinInfo .getSideTableName ();
121- String nonSideTableName = joinInfo .getNonSideTable ();
122- List <String > fields = Lists .newArrayList ();
123-
124- int sideIndex = 0 ;
125- for ( int i =0 ; i <outFieldInfoList .size (); i ++){
126- FieldInfo fieldInfo = outFieldInfoList .get (i );
127- if (fieldInfo .getTable ().equalsIgnoreCase (sideTableName )){
128- fields .add (fieldInfo .getFieldName ());
129- sideFieldIndex .put (i , sideIndex );
130- sideIndex ++;
131- }else if (fieldInfo .getTable ().equalsIgnoreCase (nonSideTableName )){
132- int nonSideIndex = rowTypeInfo .getFieldIndex (fieldInfo .getFieldName ());
133- inFieldIndex .put (i , nonSideIndex );
134- }else {
135- throw new RuntimeException ("unknown table " + fieldInfo .getTable ());
136- }
137- }
138-
139- if (fields .size () == 0 ){
140- throw new RuntimeException ("select non field from table " + sideTableName );
141- }
142-
143- sideSelectFields = String .join ("," , fields );
144- }
145-
146- public abstract void buildEqualInfo (JoinInfo joinInfo , SideTableInfo sideTableInfo );
147-
148- public void dealOneEqualCon (SqlNode sqlNode , String sideTableName ){
149- if (sqlNode .getKind () != SqlKind .EQUALS ){
150- throw new RuntimeException ("not equal operator." );
151- }
152-
153- SqlIdentifier left = (SqlIdentifier )((SqlBasicCall )sqlNode ).getOperands ()[0 ];
154- SqlIdentifier right = (SqlIdentifier )((SqlBasicCall )sqlNode ).getOperands ()[1 ];
155-
156- String leftTableName = left .getComponent (0 ).getSimple ();
157- String leftField = left .getComponent (1 ).getSimple ();
158-
159- String rightTableName = right .getComponent (0 ).getSimple ();
160- String rightField = right .getComponent (1 ).getSimple ();
161-
162- if (leftTableName .equalsIgnoreCase (sideTableName )){
163- equalFieldList .add (leftField );
164- int equalFieldIndex = -1 ;
165- for (int i =0 ; i <rowTypeInfo .getFieldNames ().length ; i ++){
166- String fieldName = rowTypeInfo .getFieldNames ()[i ];
167- if (fieldName .equalsIgnoreCase (rightField )){
168- equalFieldIndex = i ;
169- }
170- }
171- if (equalFieldIndex == -1 ){
172- throw new RuntimeException ("can't find equal field " + rightField );
173- }
174-
175- equalValIndex .add (equalFieldIndex );
176-
177- }else if (rightTableName .equalsIgnoreCase (sideTableName )){
178-
179- equalFieldList .add (rightField );
180- int equalFieldIndex = -1 ;
181- for (int i =0 ; i <rowTypeInfo .getFieldNames ().length ; i ++){
182- String fieldName = rowTypeInfo .getFieldNames ()[i ];
183- if (fieldName .equalsIgnoreCase (leftField )){
184- equalFieldIndex = i ;
185- }
186- }
187- if (equalFieldIndex == -1 ){
188- throw new RuntimeException ("can't find equal field " + rightField );
189- }
190-
191- equalValIndex .add (equalFieldIndex );
192-
193- }else {
194- throw new RuntimeException ("resolve equalFieldList error:" + sqlNode .toString ());
195- }
196-
197- }
19882
19983 protected abstract Row fillData (Row input , Object sideInput );
20084
20185 protected void dealMissKey (Row input , ResultFuture <Row > resultFuture ){
202- if (joinType == JoinType .LEFT ){
86+ if (sideInfo . getJoinType () == JoinType .LEFT ){
20387 //Reserved left table data
20488 Row row = fillData (input , null );
20589 resultFuture .complete (Collections .singleton (row ));
0 commit comments