2 * Copyright (c) 2016, 2017 Oracle. All rights reserved.
3 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
4 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the BSD-type
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions
16 * Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
19 * Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
24 * Neither the name of the Network Appliance, Inc. nor the names of
25 * its contributors may be used to endorse or promote products
26 * derived from this software without specific prior written
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * Author: Tom Tucker <tom@opengridcomputing.com>
44 #include <linux/sunrpc/xdr.h>
45 #include <linux/sunrpc/debug.h>
46 #include <linux/sunrpc/rpc_rdma.h>
47 #include <linux/spinlock.h>
48 #include <asm/unaligned.h>
49 #include <rdma/ib_verbs.h>
50 #include <rdma/rdma_cm.h>
51 #include <linux/sunrpc/svc_rdma.h>
53 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
56 * Replace the pages in the rq_argpages array with the pages from the SGE in
57 * the RDMA_RECV completion. The SGL should contain full pages up until the
60 static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
61 struct svc_rdma_op_ctxt *ctxt,
64 struct rpcrdma_msg *rmsgp;
69 /* Swap the page in the SGE with the page in argpages */
70 page = ctxt->pages[0];
71 put_page(rqstp->rq_pages[0]);
72 rqstp->rq_pages[0] = page;
74 /* Set up the XDR head */
75 rqstp->rq_arg.head[0].iov_base = page_address(page);
76 rqstp->rq_arg.head[0].iov_len =
77 min_t(size_t, byte_count, ctxt->sge[0].length);
78 rqstp->rq_arg.len = byte_count;
79 rqstp->rq_arg.buflen = byte_count;
81 /* Compute bytes past head in the SGL */
82 bc = byte_count - rqstp->rq_arg.head[0].iov_len;
84 /* If data remains, store it in the pagelist */
85 rqstp->rq_arg.page_len = bc;
86 rqstp->rq_arg.page_base = 0;
88 /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
89 rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
90 if (rmsgp->rm_type == rdma_nomsg)
91 rqstp->rq_arg.pages = &rqstp->rq_pages[0];
93 rqstp->rq_arg.pages = &rqstp->rq_pages[1];
96 while (bc && sge_no < ctxt->count) {
97 page = ctxt->pages[sge_no];
98 put_page(rqstp->rq_pages[sge_no]);
99 rqstp->rq_pages[sge_no] = page;
100 bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
101 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
104 rqstp->rq_respages = &rqstp->rq_pages[sge_no];
105 rqstp->rq_next_page = rqstp->rq_respages + 1;
107 /* If not all pages were used from the SGL, free the remaining ones */
109 while (sge_no < ctxt->count) {
110 page = ctxt->pages[sge_no++];
116 rqstp->rq_arg.tail[0].iov_base = NULL;
117 rqstp->rq_arg.tail[0].iov_len = 0;
120 /* This accommodates the largest possible Position-Zero
121 * Read chunk or Reply chunk, in one segment.
123 #define MAX_BYTES_SPECIAL_SEG ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
125 /* Sanity check the Read list.
127 * Implementation limits:
128 * - This implementation supports only one Read chunk.
131 * - Read list does not overflow buffer.
132 * - Segment size limited by largest NFS data payload.
134 * The segment count is limited to how many segments can
135 * fit in the transport header without overflowing the
136 * buffer. That's about 40 Read segments for a 1KB inline
139 * Returns pointer to the following Write list.
141 static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end)
147 while (*p++ != xdr_zero) {
149 position = be32_to_cpup(p++);
151 } else if (be32_to_cpup(p++) != position) {
155 if (be32_to_cpup(p++) > MAX_BYTES_SPECIAL_SEG)
165 static __be32 *xdr_check_write_list(__be32 *p, __be32 *end)
169 while (*p++ != xdr_zero) {
170 next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
178 static __be32 *xdr_check_reply_chunk(__be32 *p, __be32 *end)
182 if (*p++ != xdr_zero) {
183 next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
191 /* On entry, xdr->head[0].iov_base points to first byte in the
192 * RPC-over-RDMA header.
194 * On successful exit, head[0] points to first byte past the
195 * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
196 * The length of the RPC-over-RDMA header is returned.
199 * - The transport header is entirely contained in the head iovec.
201 static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
203 __be32 *p, *end, *rdma_argp;
204 unsigned int hdr_len;
207 /* Verify that there's enough bytes for header + something */
208 if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
211 rdma_argp = rq_arg->head[0].iov_base;
212 if (*(rdma_argp + 1) != rpcrdma_version)
215 switch (*(rdma_argp + 3)) {
233 end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
234 p = xdr_check_read_list(rdma_argp + 4, end);
237 p = xdr_check_write_list(p, end);
240 p = xdr_check_reply_chunk(p, end);
246 rq_arg->head[0].iov_base = p;
247 hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
248 rq_arg->head[0].iov_len -= hdr_len;
249 dprintk("svcrdma: received %s request for XID 0x%08x, hdr_len=%u\n",
250 proc, be32_to_cpup(rdma_argp), hdr_len);
254 dprintk("svcrdma: header too short = %d\n", rq_arg->len);
258 dprintk("svcrdma: bad xprt version: %u\n",
259 be32_to_cpup(rdma_argp + 1));
260 return -EPROTONOSUPPORT;
263 dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n");
267 dprintk("svcrdma: bad rdma procedure (%u)\n",
268 be32_to_cpup(rdma_argp + 3));
272 dprintk("svcrdma: failed to parse transport header\n");
276 /* Issue an RDMA_READ using the local lkey to map the data sink */
277 int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
278 struct svc_rqst *rqstp,
279 struct svc_rdma_op_ctxt *head,
287 struct ib_rdma_wr read_wr;
288 int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
289 struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
291 u32 pg_off = *page_offset;
292 u32 pg_no = *page_no;
294 ctxt->direction = DMA_FROM_DEVICE;
295 ctxt->read_hdr = head;
296 pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd);
297 read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
300 for (pno = 0; pno < pages_needed; pno++) {
301 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
303 head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
304 head->arg.page_len += len;
306 head->arg.len += len;
309 rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
310 rqstp->rq_next_page = rqstp->rq_respages + 1;
311 ctxt->sge[pno].addr =
312 ib_dma_map_page(xprt->sc_cm_id->device,
313 head->arg.pages[pg_no], pg_off,
316 ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
317 ctxt->sge[pno].addr);
320 svc_rdma_count_mappings(xprt, ctxt);
322 ctxt->sge[pno].lkey = xprt->sc_pd->local_dma_lkey;
323 ctxt->sge[pno].length = len;
326 /* adjust offset and wrap to next page if needed */
328 if (pg_off == PAGE_SIZE) {
335 if (last && rs_length == 0)
336 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
338 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
340 memset(&read_wr, 0, sizeof(read_wr));
341 ctxt->cqe.done = svc_rdma_wc_read;
342 read_wr.wr.wr_cqe = &ctxt->cqe;
343 read_wr.wr.opcode = IB_WR_RDMA_READ;
344 read_wr.wr.send_flags = IB_SEND_SIGNALED;
345 read_wr.rkey = rs_handle;
346 read_wr.remote_addr = rs_offset;
347 read_wr.wr.sg_list = ctxt->sge;
348 read_wr.wr.num_sge = pages_needed;
350 ret = svc_rdma_send(xprt, &read_wr.wr);
352 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
353 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
357 /* return current location in page array */
359 *page_offset = pg_off;
361 atomic_inc(&rdma_stat_read);
364 svc_rdma_unmap_dma(ctxt);
365 svc_rdma_put_context(ctxt, 0);
369 /* Issue an RDMA_READ using an FRMR to map the data sink */
370 int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
371 struct svc_rqst *rqstp,
372 struct svc_rdma_op_ctxt *head,
380 struct ib_rdma_wr read_wr;
381 struct ib_send_wr inv_wr;
382 struct ib_reg_wr reg_wr;
384 int nents = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
385 struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
386 struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
387 int ret, read, pno, dma_nents, n;
388 u32 pg_off = *page_offset;
389 u32 pg_no = *page_no;
394 ctxt->direction = DMA_FROM_DEVICE;
396 nents = min_t(unsigned int, nents, xprt->sc_frmr_pg_list_len);
397 read = min_t(int, (nents << PAGE_SHIFT) - *page_offset, rs_length);
399 frmr->direction = DMA_FROM_DEVICE;
400 frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
401 frmr->sg_nents = nents;
403 for (pno = 0; pno < nents; pno++) {
404 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
406 head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
407 head->arg.page_len += len;
408 head->arg.len += len;
412 sg_set_page(&frmr->sg[pno], rqstp->rq_arg.pages[pg_no],
415 rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
416 rqstp->rq_next_page = rqstp->rq_respages + 1;
418 /* adjust offset and wrap to next page if needed */
420 if (pg_off == PAGE_SIZE) {
427 if (last && rs_length == 0)
428 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
430 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
432 dma_nents = ib_dma_map_sg(xprt->sc_cm_id->device,
433 frmr->sg, frmr->sg_nents,
436 pr_err("svcrdma: failed to dma map sg %p\n",
441 n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE);
442 if (unlikely(n != frmr->sg_nents)) {
443 pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n",
444 frmr->mr, n, frmr->sg_nents);
445 return n < 0 ? n : -EINVAL;
449 key = (u8)(frmr->mr->lkey & 0x000000FF);
450 ib_update_fast_reg_key(frmr->mr, ++key);
452 ctxt->sge[0].addr = frmr->mr->iova;
453 ctxt->sge[0].lkey = frmr->mr->lkey;
454 ctxt->sge[0].length = frmr->mr->length;
456 ctxt->read_hdr = head;
459 ctxt->reg_cqe.done = svc_rdma_wc_reg;
460 reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
461 reg_wr.wr.opcode = IB_WR_REG_MR;
462 reg_wr.wr.send_flags = IB_SEND_SIGNALED;
463 reg_wr.wr.num_sge = 0;
464 reg_wr.mr = frmr->mr;
465 reg_wr.key = frmr->mr->lkey;
466 reg_wr.access = frmr->access_flags;
467 reg_wr.wr.next = &read_wr.wr;
469 /* Prepare RDMA_READ */
470 memset(&read_wr, 0, sizeof(read_wr));
471 ctxt->cqe.done = svc_rdma_wc_read;
472 read_wr.wr.wr_cqe = &ctxt->cqe;
473 read_wr.wr.send_flags = IB_SEND_SIGNALED;
474 read_wr.rkey = rs_handle;
475 read_wr.remote_addr = rs_offset;
476 read_wr.wr.sg_list = ctxt->sge;
477 read_wr.wr.num_sge = 1;
478 if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
479 read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
480 read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
482 read_wr.wr.opcode = IB_WR_RDMA_READ;
483 read_wr.wr.next = &inv_wr;
484 /* Prepare invalidate */
485 memset(&inv_wr, 0, sizeof(inv_wr));
486 ctxt->inv_cqe.done = svc_rdma_wc_inv;
487 inv_wr.wr_cqe = &ctxt->inv_cqe;
488 inv_wr.opcode = IB_WR_LOCAL_INV;
489 inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
490 inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
494 ret = svc_rdma_send(xprt, ®_wr.wr);
496 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
497 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
501 /* return current location in page array */
503 *page_offset = pg_off;
505 atomic_inc(&rdma_stat_read);
508 svc_rdma_put_context(ctxt, 0);
509 svc_rdma_put_frmr(xprt, frmr);
513 /* If there was additional inline content, append it to the end of arg.pages.
514 * Tail copy has to be done after the reader function has determined how many
515 * pages are needed for RDMA READ.
518 rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
519 u32 position, u32 byte_count, u32 page_offset, int page_no)
523 srcp = head->arg.head[0].iov_base + position;
524 byte_count = head->arg.head[0].iov_len - position;
525 if (byte_count > PAGE_SIZE) {
526 dprintk("svcrdma: large tail unsupported\n");
530 /* Fit as much of the tail on the current page as possible */
531 if (page_offset != PAGE_SIZE) {
532 destp = page_address(rqstp->rq_arg.pages[page_no]);
533 destp += page_offset;
534 while (byte_count--) {
537 if (page_offset == PAGE_SIZE && byte_count)
544 /* Fit the rest on the next page */
546 destp = page_address(rqstp->rq_arg.pages[page_no]);
550 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
551 rqstp->rq_next_page = rqstp->rq_respages + 1;
554 byte_count = head->arg.head[0].iov_len - position;
555 head->arg.page_len += byte_count;
556 head->arg.len += byte_count;
557 head->arg.buflen += byte_count;
561 /* Returns the address of the first read chunk or <nul> if no read chunk
564 static struct rpcrdma_read_chunk *
565 svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
567 struct rpcrdma_read_chunk *ch =
568 (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
570 if (ch->rc_discrim == xdr_zero)
575 static int rdma_read_chunks(struct svcxprt_rdma *xprt,
576 struct rpcrdma_msg *rmsgp,
577 struct svc_rqst *rqstp,
578 struct svc_rdma_op_ctxt *head)
581 struct rpcrdma_read_chunk *ch;
582 u32 handle, page_offset, byte_count;
587 /* If no read list is present, return 0 */
588 ch = svc_rdma_get_read_chunk(rmsgp);
592 /* The request is completed when the RDMA_READs complete. The
593 * head context keeps all the pages that comprise the
596 head->arg.head[0] = rqstp->rq_arg.head[0];
597 head->arg.tail[0] = rqstp->rq_arg.tail[0];
598 head->hdr_count = head->count;
599 head->arg.page_base = 0;
600 head->arg.page_len = 0;
601 head->arg.len = rqstp->rq_arg.len;
602 head->arg.buflen = rqstp->rq_arg.buflen;
604 /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
605 position = be32_to_cpu(ch->rc_position);
607 head->arg.pages = &head->pages[0];
608 page_offset = head->byte_len;
610 head->arg.pages = &head->pages[head->count];
616 for (; ch->rc_discrim != xdr_zero; ch++) {
617 if (be32_to_cpu(ch->rc_position) != position)
620 handle = be32_to_cpu(ch->rc_target.rs_handle),
621 byte_count = be32_to_cpu(ch->rc_target.rs_length);
622 xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
625 while (byte_count > 0) {
626 last = (ch + 1)->rc_discrim == xdr_zero;
627 ret = xprt->sc_reader(xprt, rqstp, head,
628 &page_no, &page_offset,
635 head->arg.buflen += ret;
639 /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
640 if (page_offset & 3) {
641 u32 pad = 4 - (page_offset & 3);
643 head->arg.tail[0].iov_len += pad;
644 head->arg.len += pad;
645 head->arg.buflen += pad;
650 if (position && position < head->arg.head[0].iov_len)
651 ret = rdma_copy_tail(rqstp, head, position,
652 byte_count, page_offset, page_no);
653 head->arg.head[0].iov_len = position;
654 head->position = position;
657 /* Detach arg pages. svc_recv will replenish them */
659 &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
660 rqstp->rq_pages[page_no] = NULL;
665 static void rdma_read_complete(struct svc_rqst *rqstp,
666 struct svc_rdma_op_ctxt *head)
671 for (page_no = 0; page_no < head->count; page_no++) {
672 put_page(rqstp->rq_pages[page_no]);
673 rqstp->rq_pages[page_no] = head->pages[page_no];
676 /* Adjustments made for RDMA_NOMSG type requests */
677 if (head->position == 0) {
678 if (head->arg.len <= head->sge[0].length) {
679 head->arg.head[0].iov_len = head->arg.len -
681 head->arg.page_len = 0;
683 head->arg.head[0].iov_len = head->sge[0].length -
685 head->arg.page_len = head->arg.len -
690 /* Point rq_arg.pages past header */
691 rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
692 rqstp->rq_arg.page_len = head->arg.page_len;
693 rqstp->rq_arg.page_base = head->arg.page_base;
695 /* rq_respages starts after the last arg page */
696 rqstp->rq_respages = &rqstp->rq_pages[page_no];
697 rqstp->rq_next_page = rqstp->rq_respages + 1;
699 /* Rebuild rq_arg head and tail. */
700 rqstp->rq_arg.head[0] = head->arg.head[0];
701 rqstp->rq_arg.tail[0] = head->arg.tail[0];
702 rqstp->rq_arg.len = head->arg.len;
703 rqstp->rq_arg.buflen = head->arg.buflen;
706 static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
707 __be32 *rdma_argp, int status)
709 struct svc_rdma_op_ctxt *ctxt;
710 __be32 *p, *err_msgp;
715 ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
719 page = alloc_page(GFP_KERNEL);
722 err_msgp = page_address(page);
726 *p++ = *(rdma_argp + 1);
727 *p++ = xprt->sc_fc_credits;
729 if (status == -EPROTONOSUPPORT) {
731 *p++ = rpcrdma_version;
732 *p++ = rpcrdma_version;
736 length = (unsigned long)p - (unsigned long)err_msgp;
738 /* Map transport header; no RPC message payload */
739 ctxt = svc_rdma_get_context(xprt);
740 ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
742 dprintk("svcrdma: Error %d mapping send for protocol error\n",
747 ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0);
749 dprintk("svcrdma: Error %d posting send for protocol error\n",
751 svc_rdma_unmap_dma(ctxt);
752 svc_rdma_put_context(ctxt, 1);
756 /* By convention, backchannel calls arrive via rdma_msg type
757 * messages, and never populate the chunk lists. This makes
758 * the RPC/RDMA header small and fixed in size, so it is
759 * straightforward to check the RPC header's direction field.
761 static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
766 if (!xprt->xpt_bc_xprt)
770 if (*p++ != rdma_msg)
773 if (*p++ != xdr_zero)
775 if (*p++ != xdr_zero)
777 if (*p++ != xdr_zero)
781 if (*p++ != *rdma_resp)
784 if (*p == cpu_to_be32(RPC_CALL))
791 * Set up the rqstp thread context to point to the RQ buffer. If
792 * necessary, pull additional data from the client with an RDMA_READ
795 int svc_rdma_recvfrom(struct svc_rqst *rqstp)
797 struct svc_xprt *xprt = rqstp->rq_xprt;
798 struct svcxprt_rdma *rdma_xprt =
799 container_of(xprt, struct svcxprt_rdma, sc_xprt);
800 struct svc_rdma_op_ctxt *ctxt = NULL;
801 struct rpcrdma_msg *rmsgp;
804 dprintk("svcrdma: rqstp=%p\n", rqstp);
806 spin_lock(&rdma_xprt->sc_rq_dto_lock);
807 if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
808 ctxt = list_first_entry(&rdma_xprt->sc_read_complete_q,
809 struct svc_rdma_op_ctxt, list);
810 list_del(&ctxt->list);
811 spin_unlock(&rdma_xprt->sc_rq_dto_lock);
812 rdma_read_complete(rqstp, ctxt);
814 } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
815 ctxt = list_first_entry(&rdma_xprt->sc_rq_dto_q,
816 struct svc_rdma_op_ctxt, list);
817 list_del(&ctxt->list);
819 atomic_inc(&rdma_stat_rq_starve);
820 clear_bit(XPT_DATA, &xprt->xpt_flags);
823 spin_unlock(&rdma_xprt->sc_rq_dto_lock);
825 /* This is the EAGAIN path. The svc_recv routine will
826 * return -EAGAIN, the nfsd thread will go to call into
827 * svc_recv again and we shouldn't be on the active
830 if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
834 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p\n",
835 ctxt, rdma_xprt, rqstp);
836 atomic_inc(&rdma_stat_recv);
838 /* Build up the XDR from the receive buffers. */
839 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
841 /* Decode the RDMA header. */
842 rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
843 ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg);
848 rqstp->rq_xprt_hlen = ret;
850 if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) {
851 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt,
854 svc_rdma_put_context(ctxt, 0);
860 /* Read read-list data. */
861 ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
863 /* read-list posted, defer until data received from client. */
865 } else if (ret < 0) {
866 /* Post of read-list failed, free context. */
867 svc_rdma_put_context(ctxt, 1);
872 ret = rqstp->rq_arg.head[0].iov_len
873 + rqstp->rq_arg.page_len
874 + rqstp->rq_arg.tail[0].iov_len;
875 svc_rdma_put_context(ctxt, 0);
877 dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
878 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
879 ret, rqstp->rq_arg.len,
880 rqstp->rq_arg.head[0].iov_base,
881 rqstp->rq_arg.head[0].iov_len);
882 rqstp->rq_prot = IPPROTO_MAX;
883 svc_xprt_copy_addrs(rqstp, xprt);
887 svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret);
888 svc_rdma_put_context(ctxt, 0);
895 svc_rdma_put_context(ctxt, 1);
897 return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);