@@ -163,14 +163,15 @@ void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
163163 cc -> cc_sqecount = 0 ;
164164}
165165
166- /*
167- * The consumed rw_ctx's are cleaned and placed on a local llist so
168- * that only one atomic llist operation is needed to put them all
169- * back on the free list.
166+ /**
167+ * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
168+ * @rdma: controlling transport instance
169+ * @cc: svc_rdma_chunk_ctxt to be released
170+ * @dir: DMA direction
170171 */
171- static void svc_rdma_cc_release (struct svcxprt_rdma * rdma ,
172- struct svc_rdma_chunk_ctxt * cc ,
173- enum dma_data_direction dir )
172+ void svc_rdma_cc_release (struct svcxprt_rdma * rdma ,
173+ struct svc_rdma_chunk_ctxt * cc ,
174+ enum dma_data_direction dir )
174175{
175176 struct llist_node * first , * last ;
176177 struct svc_rdma_rw_ctxt * ctxt ;
@@ -300,23 +301,35 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
300301 container_of (cqe , struct svc_rdma_chunk_ctxt , cc_cqe );
301302 struct svc_rdma_recv_ctxt * ctxt ;
302303
304+ svc_rdma_wake_send_waiters (rdma , cc -> cc_sqecount );
305+
306+ ctxt = container_of (cc , struct svc_rdma_recv_ctxt , rc_cc );
303307 switch (wc -> status ) {
304308 case IB_WC_SUCCESS :
305- ctxt = container_of (cc , struct svc_rdma_recv_ctxt , rc_cc );
306309 trace_svcrdma_wc_read (wc , & cc -> cc_cid , ctxt -> rc_readbytes ,
307310 cc -> cc_posttime );
308- break ;
311+
312+ spin_lock (& rdma -> sc_rq_dto_lock );
313+ list_add_tail (& ctxt -> rc_list , & rdma -> sc_read_complete_q );
314+ /* the unlock pairs with the smp_rmb in svc_xprt_ready */
315+ set_bit (XPT_DATA , & rdma -> sc_xprt .xpt_flags );
316+ spin_unlock (& rdma -> sc_rq_dto_lock );
317+ svc_xprt_enqueue (& rdma -> sc_xprt );
318+ return ;
309319 case IB_WC_WR_FLUSH_ERR :
310320 trace_svcrdma_wc_read_flush (wc , & cc -> cc_cid );
311321 break ;
312322 default :
313323 trace_svcrdma_wc_read_err (wc , & cc -> cc_cid );
314324 }
315325
316- svc_rdma_wake_send_waiters (rdma , cc -> cc_sqecount );
317- cc -> cc_status = wc -> status ;
318- complete (& cc -> cc_done );
319- return ;
326+ /* The RDMA Read has flushed, so the incoming RPC message
327+ * cannot be constructed and must be dropped. Signal the
328+ * loss to the client by closing the connection.
329+ */
330+ svc_rdma_cc_release (rdma , cc , DMA_FROM_DEVICE );
331+ svc_rdma_recv_ctxt_put (rdma , ctxt );
332+ svc_xprt_deferred_close (& rdma -> sc_xprt );
320333}
321334
322335/*
@@ -823,7 +836,6 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
823836 struct svc_rdma_recv_ctxt * head )
824837{
825838 const struct svc_rdma_pcl * pcl = & head -> rc_read_pcl ;
826- struct xdr_buf * buf = & rqstp -> rq_arg ;
827839 struct svc_rdma_chunk * chunk , * next ;
828840 unsigned int start , length ;
829841 int ret ;
@@ -853,18 +865,7 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
853865
854866 start += length ;
855867 length = head -> rc_byte_len - start ;
856- ret = svc_rdma_copy_inline_range (rqstp , head , start , length );
857- if (ret < 0 )
858- return ret ;
859-
860- buf -> len += head -> rc_readbytes ;
861- buf -> buflen += head -> rc_readbytes ;
862-
863- buf -> head [0 ].iov_base = page_address (rqstp -> rq_pages [0 ]);
864- buf -> head [0 ].iov_len = min_t (size_t , PAGE_SIZE , head -> rc_readbytes );
865- buf -> pages = & rqstp -> rq_pages [1 ];
866- buf -> page_len = head -> rc_readbytes - buf -> head [0 ].iov_len ;
867- return 0 ;
868+ return svc_rdma_copy_inline_range (rqstp , head , start , length );
868869}
869870
870871/**
@@ -888,42 +889,8 @@ svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
888889static int svc_rdma_read_data_item (struct svc_rqst * rqstp ,
889890 struct svc_rdma_recv_ctxt * head )
890891{
891- struct xdr_buf * buf = & rqstp -> rq_arg ;
892- struct svc_rdma_chunk * chunk ;
893- unsigned int length ;
894- int ret ;
895-
896- chunk = pcl_first_chunk (& head -> rc_read_pcl );
897- ret = svc_rdma_build_read_chunk (rqstp , head , chunk );
898- if (ret < 0 )
899- goto out ;
900-
901- /* Split the Receive buffer between the head and tail
902- * buffers at Read chunk's position. XDR roundup of the
903- * chunk is not included in either the pagelist or in
904- * the tail.
905- */
906- buf -> tail [0 ].iov_base = buf -> head [0 ].iov_base + chunk -> ch_position ;
907- buf -> tail [0 ].iov_len = buf -> head [0 ].iov_len - chunk -> ch_position ;
908- buf -> head [0 ].iov_len = chunk -> ch_position ;
909-
910- /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
911- *
912- * If the client already rounded up the chunk length, the
913- * length does not change. Otherwise, the length of the page
914- * list is increased to include XDR round-up.
915- *
916- * Currently these chunks always start at page offset 0,
917- * thus the rounded-up length never crosses a page boundary.
918- */
919- buf -> pages = & rqstp -> rq_pages [0 ];
920- length = xdr_align_size (chunk -> ch_length );
921- buf -> page_len = length ;
922- buf -> len += length ;
923- buf -> buflen += length ;
924-
925- out :
926- return ret ;
892+ return svc_rdma_build_read_chunk (rqstp , head ,
893+ pcl_first_chunk (& head -> rc_read_pcl ));
927894}
928895
929896/**
@@ -1051,23 +1018,28 @@ static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
10511018static noinline int svc_rdma_read_special (struct svc_rqst * rqstp ,
10521019 struct svc_rdma_recv_ctxt * head )
10531020{
1054- struct xdr_buf * buf = & rqstp -> rq_arg ;
1055- int ret ;
1056-
1057- ret = svc_rdma_read_call_chunk (rqstp , head );
1058- if (ret < 0 )
1059- goto out ;
1060-
1061- buf -> len += head -> rc_readbytes ;
1062- buf -> buflen += head -> rc_readbytes ;
1021+ return svc_rdma_read_call_chunk (rqstp , head );
1022+ }
10631023
1064- buf -> head [0 ].iov_base = page_address (rqstp -> rq_pages [0 ]);
1065- buf -> head [0 ].iov_len = min_t (size_t , PAGE_SIZE , head -> rc_readbytes );
1066- buf -> pages = & rqstp -> rq_pages [1 ];
1067- buf -> page_len = head -> rc_readbytes - buf -> head [0 ].iov_len ;
1024+ /* Pages under I/O have been copied to head->rc_pages. Ensure that
1025+ * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1026+ * returns. This has to be done after all Read WRs are constructed
1027+ * to properly handle a page that happens to be part of I/O on behalf
1028+ * of two different RDMA segments.
1029+ *
1030+ * Note: if the subsequent post_send fails, these pages have already
1031+ * been moved to head->rc_pages and thus will be cleaned up by
1032+ * svc_rdma_recv_ctxt_put().
1033+ */
1034+ static void svc_rdma_clear_rqst_pages (struct svc_rqst * rqstp ,
1035+ struct svc_rdma_recv_ctxt * head )
1036+ {
1037+ unsigned int i ;
10681038
1069- out :
1070- return ret ;
1039+ for (i = 0 ; i < head -> rc_page_count ; i ++ ) {
1040+ head -> rc_pages [i ] = rqstp -> rq_pages [i ];
1041+ rqstp -> rq_pages [i ] = NULL ;
1042+ }
10711043}
10721044
10731045/**
@@ -1113,30 +1085,11 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
11131085 ret = svc_rdma_read_multiple_chunks (rqstp , head );
11141086 } else
11151087 ret = svc_rdma_read_special (rqstp , head );
1088+ svc_rdma_clear_rqst_pages (rqstp , head );
11161089 if (ret < 0 )
1117- goto out_err ;
1090+ return ret ;
11181091
11191092 trace_svcrdma_post_read_chunk (& cc -> cc_cid , cc -> cc_sqecount );
1120- init_completion (& cc -> cc_done );
11211093 ret = svc_rdma_post_chunk_ctxt (rdma , cc );
1122- if (ret < 0 )
1123- goto out_err ;
1124-
1125- ret = 1 ;
1126- wait_for_completion (& cc -> cc_done );
1127- if (cc -> cc_status != IB_WC_SUCCESS )
1128- ret = - EIO ;
1129-
1130- /* rq_respages starts after the last arg page */
1131- rqstp -> rq_respages = & rqstp -> rq_pages [head -> rc_page_count ];
1132- rqstp -> rq_next_page = rqstp -> rq_respages + 1 ;
1133-
1134- /* Ensure svc_rdma_recv_ctxt_put() does not release pages
1135- * left in @rc_pages while I/O proceeds.
1136- */
1137- head -> rc_page_count = 0 ;
1138-
1139- out_err :
1140- svc_rdma_cc_release (rdma , cc , DMA_FROM_DEVICE );
1141- return ret ;
1094+ return ret < 0 ? ret : 1 ;
11421095}
0 commit comments