3838import org .apache .flink .streaming .api .functions .async .RichAsyncFunction ;
3939import org .apache .flink .streaming .api .operators .StreamingRuntimeContext ;
4040import org .apache .flink .streaming .runtime .tasks .ProcessingTimeService ;
41- import org .apache .flink .table .api .DataTypes ;
4241import org .apache .flink .table .dataformat .BaseRow ;
42+ import org .apache .flink .table .dataformat .GenericRow ;
4343import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
44- import org .apache .flink .types .Row ;
4544import org .slf4j .Logger ;
4645import org .slf4j .LoggerFactory ;
4746
6463 * @author xuchao
6564 */
6665
67- public abstract class BaseAsyncReqRow extends RichAsyncFunction <Row , BaseRow > implements ISideReqRow {
66+ public abstract class BaseAsyncReqRow extends RichAsyncFunction <BaseRow , BaseRow > implements ISideReqRow {
6867 private static final Logger LOG = LoggerFactory .getLogger (BaseAsyncReqRow .class );
6968 private static final long serialVersionUID = 2098635244857937717L ;
7069 private RuntimeContext runtimeContext ;
@@ -137,12 +136,12 @@ protected boolean openCache() {
137136 return sideInfo .getSideCache () != null ;
138137 }
139138
140- protected void dealMissKey (Row input , ResultFuture <BaseRow > resultFuture ) {
139+ protected void dealMissKey (BaseRow input , ResultFuture <BaseRow > resultFuture ) {
141140 if (sideInfo .getJoinType () == JoinType .LEFT ) {
142141 //Reserved left table data
143142 try {
144- Row row = fillData (input , null );
145- RowDataComplete .completeRow (resultFuture , row );
143+ BaseRow row = fillData (input , null );
144+ RowDataComplete .completeBaseRow (resultFuture , row );
146145 } catch (Exception e ) {
147146 dealFillDataError (input , resultFuture , e );
148147 }
@@ -158,7 +157,7 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
158157 }
159158
160159 @ Override
161- public void timeout (Row input , ResultFuture <BaseRow > resultFuture ) throws Exception {
160+ public void timeout (BaseRow input , ResultFuture <BaseRow > resultFuture ) throws Exception {
162161
163162 if (timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0 ) {
164163 LOG .info ("Async function call has timed out. input:{}, timeOutNum:{}" , input .toString (), timeOutNum );
@@ -175,32 +174,32 @@ public void timeout(Row input, ResultFuture<BaseRow> resultFuture) throws Except
175174 resultFuture .complete (Collections .EMPTY_LIST );
176175 }
177176
178- protected void preInvoke (Row input , ResultFuture <BaseRow > resultFuture )
177+ protected void preInvoke (BaseRow input , ResultFuture <BaseRow > resultFuture )
179178 throws InvocationTargetException , IllegalAccessException {
180179 registerTimerAndAddToHandler (input , resultFuture );
181180 }
182181
183182 @ Override
184- public void asyncInvoke (Row row , ResultFuture <BaseRow > resultFuture ) throws Exception {
185- Row input = Row .copy (row );
186- preInvoke (input , resultFuture );
187- Map <String , Object > inputParams = parseInputParam (input );
183+ public void asyncInvoke (BaseRow row , ResultFuture <BaseRow > resultFuture ) throws Exception {
184+ preInvoke (row , resultFuture );
185+ Map <String , Object > inputParams = parseInputParam (row );
188186 if (MapUtils .isEmpty (inputParams )) {
189- dealMissKey (input , resultFuture );
187+ dealMissKey (row , resultFuture );
190188 return ;
191189 }
192190 if (isUseCache (inputParams )) {
193- invokeWithCache (inputParams , input , resultFuture );
191+ invokeWithCache (inputParams , row , resultFuture );
194192 return ;
195193 }
196- handleAsyncInvoke (inputParams , input , resultFuture );
194+ handleAsyncInvoke (inputParams , row , resultFuture );
197195 }
198196
199- private Map <String , Object > parseInputParam (Row input ) {
197+ private Map <String , Object > parseInputParam (BaseRow input ) {
198+ GenericRow genericRow = (GenericRow ) input ;
200199 Map <String , Object > inputParams = Maps .newLinkedHashMap ();
201200 for (int i = 0 ; i < sideInfo .getEqualValIndex ().size (); i ++) {
202201 Integer conValIndex = sideInfo .getEqualValIndex ().get (i );
203- Object equalObj = input .getField (conValIndex );
202+ Object equalObj = genericRow .getField (conValIndex );
204203 if (equalObj == null ) {
205204 return inputParams ;
206205 }
@@ -214,7 +213,7 @@ protected boolean isUseCache(Map<String, Object> inputParams) {
214213 return openCache () && getFromCache (buildCacheKey (inputParams )) != null ;
215214 }
216215
217- private void invokeWithCache (Map <String , Object > inputParams , Row input , ResultFuture <BaseRow > resultFuture ) {
216+ private void invokeWithCache (Map <String , Object > inputParams , BaseRow input , ResultFuture <BaseRow > resultFuture ) {
218217 if (openCache ()) {
219218 CacheObj val = getFromCache (buildCacheKey (inputParams ));
220219 if (val != null ) {
@@ -223,19 +222,19 @@ private void invokeWithCache(Map<String, Object> inputParams, Row input, ResultF
223222 return ;
224223 } else if (ECacheContentType .SingleLine == val .getType ()) {
225224 try {
226- Row row = fillData (input , val .getContent ());
227- RowDataComplete .completeRow (resultFuture , row );
225+ BaseRow row = fillData (input , val .getContent ());
226+ RowDataComplete .completeBaseRow (resultFuture , row );
228227 } catch (Exception e ) {
229228 dealFillDataError (input , resultFuture , e );
230229 }
231230 } else if (ECacheContentType .MultiLine == val .getType ()) {
232231 try {
233- List <Row > rowList = Lists .newArrayList ();
232+ List <BaseRow > rowList = Lists .newArrayList ();
234233 for (Object one : (List ) val .getContent ()) {
235- Row row = fillData (input , one );
234+ BaseRow row = fillData (input , one );
236235 rowList .add (row );
237236 }
238- RowDataComplete .completeRow (resultFuture ,rowList );
237+ RowDataComplete .completeBaseRow (resultFuture ,rowList );
239238 } catch (Exception e ) {
240239 dealFillDataError (input , resultFuture , e );
241240 }
@@ -247,22 +246,22 @@ private void invokeWithCache(Map<String, Object> inputParams, Row input, ResultF
247246 }
248247 }
249248
250- public abstract void handleAsyncInvoke (Map <String , Object > inputParams , Row input , ResultFuture <BaseRow > resultFuture ) throws Exception ;
249+ public abstract void handleAsyncInvoke (Map <String , Object > inputParams , BaseRow input , ResultFuture <BaseRow > resultFuture ) throws Exception ;
251250
252251 public abstract String buildCacheKey (Map <String , Object > inputParams );
253252
254253 private ProcessingTimeService getProcessingTimeService () {
255254 return ((StreamingRuntimeContext ) this .runtimeContext ).getProcessingTimeService ();
256255 }
257256
258- protected ScheduledFuture <?> registerTimer (Row input , ResultFuture <BaseRow > resultFuture ) {
257+ protected ScheduledFuture <?> registerTimer (BaseRow input , ResultFuture <BaseRow > resultFuture ) {
259258 long timeoutTimestamp = sideInfo .getSideTableInfo ().getAsyncTimeout () + getProcessingTimeService ().getCurrentProcessingTime ();
260259 return getProcessingTimeService ().registerTimer (
261260 timeoutTimestamp ,
262261 timestamp -> timeout (input , resultFuture ));
263262 }
264263
265- protected void registerTimerAndAddToHandler (Row input , ResultFuture <BaseRow > resultFuture )
264+ protected void registerTimerAndAddToHandler (BaseRow input , ResultFuture <BaseRow > resultFuture )
266265 throws InvocationTargetException , IllegalAccessException {
267266 ScheduledFuture <?> timeFuture = registerTimer (input , resultFuture );
268267 // resultFuture 是ResultHandler 的实例
@@ -272,7 +271,7 @@ protected void registerTimerAndAddToHandler(Row input, ResultFuture<BaseRow> res
272271 }
273272
274273
275- protected void dealFillDataError (Row input , ResultFuture <BaseRow > resultFuture , Throwable e ) {
274+ protected void dealFillDataError (BaseRow input , ResultFuture <BaseRow > resultFuture , Throwable e ) {
276275 parseErrorRecords .inc ();
277276 if (parseErrorRecords .getCount () > sideInfo .getSideTableInfo ().getAsyncFailMaxNum (Long .MAX_VALUE )) {
278277 LOG .info ("dealFillDataError" , e );
0 commit comments