]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
Merge tag 'nfs-rdma-4.5' of git://git.linux-nfs.org/projects/anna/nfs-rdma
authorTrond Myklebust <trond.myklebust@primarydata.com>
Mon, 28 Dec 2015 19:49:14 +0000 (14:49 -0500)
committerTrond Myklebust <trond.myklebust@primarydata.com>
Mon, 28 Dec 2015 19:49:14 +0000 (14:49 -0500)
NFS: NFSoRDMA Client Side Changes

These patches mostly fix send queue ordering issues inside the NFSoRDMA
client, but there are also two patches from Dan Carpenter fixing up smatch
warnings.

Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
* tag 'nfs-rdma-4.5' of git://git.linux-nfs.org/projects/anna/nfs-rdma:
  xprtrdma: Revert commit e7104a2a9606 ('xprtrdma: Cap req_cqinit').
  xprtrdma: Invalidate in the RPC reply handler
  xprtrdma: Add ro_unmap_sync method for all-physical registration
  xprtrdma: Add ro_unmap_sync method for FMR
  xprtrdma: Add ro_unmap_sync method for FRWR
  xprtrdma: Introduce ro_unmap_sync method
  xprtrdma: Move struct ib_send_wr off the stack
  xprtrdma: Disable RPC/RDMA backchannel debugging messages
  xprtrdma: xprt_rdma_free() must not release backchannel reqs
  xprtrdma: Fix additional uses of spin_lock_irqsave(rb_lock)
  xprtrdma: checking for NULL instead of IS_ERR()
  xprtrdma: clean up some curly braces

net/sunrpc/xprtrdma/backchannel.c
net/sunrpc/xprtrdma/fmr_ops.c
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/physical_ops.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 2dcb44f69e539e530c208086acb4a1bee6e710ee..cc1251d0729707751d87537e1cbc6567aef1596c 100644 (file)
@@ -15,7 +15,7 @@
 # define RPCDBG_FACILITY       RPCDBG_TRANS
 #endif
 
-#define RPCRDMA_BACKCHANNEL_DEBUG
+#undef RPCRDMA_BACKCHANNEL_DEBUG
 
 static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
                                 struct rpc_rqst *rqst)
@@ -42,8 +42,8 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
        size_t size;
 
        req = rpcrdma_create_req(r_xprt);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
        req->rl_backchannel = true;
 
        size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
@@ -84,9 +84,7 @@ out_fail:
 static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
                                 unsigned int count)
 {
-       struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
        struct rpcrdma_rep *rep;
-       unsigned long flags;
        int rc = 0;
 
        while (count--) {
@@ -98,9 +96,7 @@ static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
                        break;
                }
 
-               spin_lock_irqsave(&buffers->rb_lock, flags);
-               list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-               spin_unlock_irqrestore(&buffers->rb_lock, flags);
+               rpcrdma_recv_buffer_put(rep);
        }
 
        return rc;
@@ -140,6 +136,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
                               __func__);
                        goto out_free;
                }
+               dprintk("RPC:       %s: new rqst %p\n", __func__, rqst);
 
                rqst->rq_xprt = &r_xprt->rx_xprt;
                INIT_LIST_HEAD(&rqst->rq_list);
@@ -220,12 +217,14 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 
        rpclen = rqst->rq_svec[0].iov_len;
 
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
        pr_info("RPC:       %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
                __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
        pr_info("RPC:       %s: RPC/RDMA: %*ph\n",
                __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
        pr_info("RPC:       %s:      RPC: %*ph\n",
                __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
+#endif
 
        req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
        req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
@@ -269,6 +268,9 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
 {
        struct rpc_xprt *xprt = rqst->rq_xprt;
 
+       dprintk("RPC:       %s: freeing rqst %p (req %p)\n",
+               __func__, rqst, rpcr_to_rdmar(rqst));
+
        smp_mb__before_atomic();
        WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
        clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
@@ -333,9 +335,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
                                struct rpc_rqst, rq_bc_pa_list);
        list_del(&rqst->rq_bc_pa_list);
        spin_unlock(&xprt->bc_pa_lock);
-#ifdef RPCRDMA_BACKCHANNEL_DEBUG
-       pr_info("RPC:       %s: using rqst %p\n", __func__, rqst);
-#endif
+       dprintk("RPC:       %s: using rqst %p\n", __func__, rqst);
 
        /* Prepare rqst */
        rqst->rq_reply_bytes_recvd = 0;
@@ -355,10 +355,8 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
         * direction reply.
         */
        req = rpcr_to_rdmar(rqst);
-#ifdef RPCRDMA_BACKCHANNEL_DEBUG
-       pr_info("RPC:       %s: attaching rep %p to req %p\n",
+       dprintk("RPC:       %s: attaching rep %p to req %p\n",
                __func__, rep, req);
-#endif
        req->rl_reply = rep;
 
        /* Defeat the retransmit detection logic in send_request */
index f1e8dafbd5079b3406a769ba4854ecba229edca6..c14f3a4bff6826aea365804eb4201c24d3e59b84 100644 (file)
@@ -179,6 +179,69 @@ out_maperr:
        return rc;
 }
 
+static void
+__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
+{
+       struct ib_device *device = r_xprt->rx_ia.ri_device;
+       struct rpcrdma_mw *mw = seg->rl_mw;
+       int nsegs = seg->mr_nsegs;
+
+       seg->rl_mw = NULL;
+
+       while (nsegs--)
+               rpcrdma_unmap_one(device, seg++);
+
+       rpcrdma_put_mw(r_xprt, mw);
+}
+
+/* Invalidate all memory regions that were registered for "req".
+ *
+ * Sleeps until it is safe for the host CPU to access the
+ * previously mapped memory regions.
+ */
+static void
+fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+       struct rpcrdma_mr_seg *seg;
+       unsigned int i, nchunks;
+       struct rpcrdma_mw *mw;
+       LIST_HEAD(unmap_list);
+       int rc;
+
+       dprintk("RPC:       %s: req %p\n", __func__, req);
+
+       /* ORDER: Invalidate all of the req's MRs first
+        *
+        * ib_unmap_fmr() is slow, so use a single call instead
+        * of one call per mapped MR.
+        */
+       for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+               seg = &req->rl_segments[i];
+               mw = seg->rl_mw;
+
+               list_add(&mw->r.fmr.fmr->list, &unmap_list);
+
+               i += seg->mr_nsegs;
+       }
+       rc = ib_unmap_fmr(&unmap_list);
+       if (rc)
+               pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc);
+
+       /* ORDER: Now DMA unmap all of the req's MRs, and return
+        * them to the free MW list.
+        */
+       for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+               seg = &req->rl_segments[i];
+
+               __fmr_dma_unmap(r_xprt, seg);
+
+               i += seg->mr_nsegs;
+               seg->mr_nsegs = 0;
+       }
+
+       req->rl_nchunks = 0;
+}
+
 /* Use the ib_unmap_fmr() verb to prevent further remote
  * access via RDMA READ or RDMA WRITE.
  */
@@ -231,6 +294,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
 
 const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
        .ro_map                         = fmr_op_map,
+       .ro_unmap_sync                  = fmr_op_unmap_sync,
        .ro_unmap                       = fmr_op_unmap,
        .ro_open                        = fmr_op_open,
        .ro_maxpages                    = fmr_op_maxpages,
index 88cf9e7269c2bd0d626bc64448254be73f602940..c6836844bd0eb65ffa7134a6a014d60b5b42af1f 100644 (file)
@@ -245,12 +245,14 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
                     rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
 }
 
-/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs to be reset. */
+/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs
+ * to be reset.
+ *
+ * WARNING: Only wr_id and status are reliable at this point
+ */
 static void
-frwr_sendcompletion(struct ib_wc *wc)
+__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
 {
-       struct rpcrdma_mw *r;
-
        if (likely(wc->status == IB_WC_SUCCESS))
                return;
 
@@ -261,9 +263,23 @@ frwr_sendcompletion(struct ib_wc *wc)
        else
                pr_warn("RPC:       %s: frmr %p error, status %s (%d)\n",
                        __func__, r, ib_wc_status_msg(wc->status), wc->status);
+
        r->r.frmr.fr_state = FRMR_IS_STALE;
 }
 
+static void
+frwr_sendcompletion(struct ib_wc *wc)
+{
+       struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+       struct rpcrdma_frmr *f = &r->r.frmr;
+
+       if (unlikely(wc->status != IB_WC_SUCCESS))
+               __frwr_sendcompletion_flush(wc, r);
+
+       if (f->fr_waiter)
+               complete(&f->fr_linv_done);
+}
+
 static int
 frwr_op_init(struct rpcrdma_xprt *r_xprt)
 {
@@ -319,7 +335,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
        struct rpcrdma_mw *mw;
        struct rpcrdma_frmr *frmr;
        struct ib_mr *mr;
-       struct ib_reg_wr reg_wr;
+       struct ib_reg_wr *reg_wr;
        struct ib_send_wr *bad_wr;
        int rc, i, n, dma_nents;
        u8 key;
@@ -335,7 +351,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
        } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
        frmr = &mw->r.frmr;
        frmr->fr_state = FRMR_IS_VALID;
+       frmr->fr_waiter = false;
        mr = frmr->fr_mr;
+       reg_wr = &frmr->fr_regwr;
 
        if (nsegs > ia->ri_max_frmr_depth)
                nsegs = ia->ri_max_frmr_depth;
@@ -381,19 +399,19 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
        key = (u8)(mr->rkey & 0x000000FF);
        ib_update_fast_reg_key(mr, ++key);
 
-       reg_wr.wr.next = NULL;
-       reg_wr.wr.opcode = IB_WR_REG_MR;
-       reg_wr.wr.wr_id = (uintptr_t)mw;
-       reg_wr.wr.num_sge = 0;
-       reg_wr.wr.send_flags = 0;
-       reg_wr.mr = mr;
-       reg_wr.key = mr->rkey;
-       reg_wr.access = writing ?
-                       IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
-                       IB_ACCESS_REMOTE_READ;
+       reg_wr->wr.next = NULL;
+       reg_wr->wr.opcode = IB_WR_REG_MR;
+       reg_wr->wr.wr_id = (uintptr_t)mw;
+       reg_wr->wr.num_sge = 0;
+       reg_wr->wr.send_flags = 0;
+       reg_wr->mr = mr;
+       reg_wr->key = mr->rkey;
+       reg_wr->access = writing ?
+                        IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
+                        IB_ACCESS_REMOTE_READ;
 
        DECR_CQCOUNT(&r_xprt->rx_ep);
-       rc = ib_post_send(ia->ri_id->qp, &reg_wr.wr, &bad_wr);
+       rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
        if (rc)
                goto out_senderr;
 
@@ -413,6 +431,116 @@ out_senderr:
        return rc;
 }
 
+static struct ib_send_wr *
+__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
+{
+       struct rpcrdma_mw *mw = seg->rl_mw;
+       struct rpcrdma_frmr *f = &mw->r.frmr;
+       struct ib_send_wr *invalidate_wr;
+
+       f->fr_waiter = false;
+       f->fr_state = FRMR_IS_INVALID;
+       invalidate_wr = &f->fr_invwr;
+
+       memset(invalidate_wr, 0, sizeof(*invalidate_wr));
+       invalidate_wr->wr_id = (unsigned long)(void *)mw;
+       invalidate_wr->opcode = IB_WR_LOCAL_INV;
+       invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;
+
+       return invalidate_wr;
+}
+
+static void
+__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
+                int rc)
+{
+       struct ib_device *device = r_xprt->rx_ia.ri_device;
+       struct rpcrdma_mw *mw = seg->rl_mw;
+       struct rpcrdma_frmr *f = &mw->r.frmr;
+
+       seg->rl_mw = NULL;
+
+       ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir);
+
+       if (!rc)
+               rpcrdma_put_mw(r_xprt, mw);
+       else
+               __frwr_queue_recovery(mw);
+}
+
+/* Invalidate all memory regions that were registered for "req".
+ *
+ * Sleeps until it is safe for the host CPU to access the
+ * previously mapped memory regions.
+ */
+static void
+frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+       struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+       struct rpcrdma_mr_seg *seg;
+       unsigned int i, nchunks;
+       struct rpcrdma_frmr *f;
+       int rc;
+
+       dprintk("RPC:       %s: req %p\n", __func__, req);
+
+       /* ORDER: Invalidate all of the req's MRs first
+        *
+        * Chain the LOCAL_INV Work Requests and post them with
+        * a single ib_post_send() call.
+        */
+       invalidate_wrs = pos = prev = NULL;
+       seg = NULL;
+       for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+               seg = &req->rl_segments[i];
+
+               pos = __frwr_prepare_linv_wr(seg);
+
+               if (!invalidate_wrs)
+                       invalidate_wrs = pos;
+               else
+                       prev->next = pos;
+               prev = pos;
+
+               i += seg->mr_nsegs;
+       }
+       f = &seg->rl_mw->r.frmr;
+
+       /* Strong send queue ordering guarantees that when the
+        * last WR in the chain completes, all WRs in the chain
+        * are complete.
+        */
+       f->fr_invwr.send_flags = IB_SEND_SIGNALED;
+       f->fr_waiter = true;
+       init_completion(&f->fr_linv_done);
+       INIT_CQCOUNT(&r_xprt->rx_ep);
+
+       /* Transport disconnect drains the receive CQ before it
+        * replaces the QP. The RPC reply handler won't call us
+        * unless ri_id->qp is a valid pointer.
+        */
+       rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
+       if (rc)
+               pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
+
+       wait_for_completion(&f->fr_linv_done);
+
+       /* ORDER: Now DMA unmap all of the req's MRs, and return
+        * them to the free MW list.
+        */
+       for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+               seg = &req->rl_segments[i];
+
+               __frwr_dma_unmap(r_xprt, seg, rc);
+
+               i += seg->mr_nsegs;
+               seg->mr_nsegs = 0;
+       }
+
+       req->rl_nchunks = 0;
+}
+
 /* Post a LOCAL_INV Work Request to prevent further remote access
  * via RDMA READ or RDMA WRITE.
  */
@@ -423,23 +551,24 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_mw *mw = seg1->rl_mw;
        struct rpcrdma_frmr *frmr = &mw->r.frmr;
-       struct ib_send_wr invalidate_wr, *bad_wr;
+       struct ib_send_wr *invalidate_wr, *bad_wr;
        int rc, nsegs = seg->mr_nsegs;
 
        dprintk("RPC:       %s: FRMR %p\n", __func__, mw);
 
        seg1->rl_mw = NULL;
        frmr->fr_state = FRMR_IS_INVALID;
+       invalidate_wr = &mw->r.frmr.fr_invwr;
 
-       memset(&invalidate_wr, 0, sizeof(invalidate_wr));
-       invalidate_wr.wr_id = (unsigned long)(void *)mw;
-       invalidate_wr.opcode = IB_WR_LOCAL_INV;
-       invalidate_wr.ex.invalidate_rkey = frmr->fr_mr->rkey;
+       memset(invalidate_wr, 0, sizeof(*invalidate_wr));
+       invalidate_wr->wr_id = (uintptr_t)mw;
+       invalidate_wr->opcode = IB_WR_LOCAL_INV;
+       invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
        DECR_CQCOUNT(&r_xprt->rx_ep);
 
        ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir);
        read_lock(&ia->ri_qplock);
-       rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
+       rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr);
        read_unlock(&ia->ri_qplock);
        if (rc)
                goto out_err;
@@ -471,6 +600,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
 
 const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
        .ro_map                         = frwr_op_map,
+       .ro_unmap_sync                  = frwr_op_unmap_sync,
        .ro_unmap                       = frwr_op_unmap,
        .ro_open                        = frwr_op_open,
        .ro_maxpages                    = frwr_op_maxpages,
index 617b76f22154c41b41cdccd329cbbe9f00a3fabe..dbb302ecf59012702f03ca1b77b8efdd9624d52b 100644 (file)
@@ -83,6 +83,18 @@ physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
        return 1;
 }
 
+/* DMA unmap all memory regions that were mapped for "req".
+ */
+static void
+physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+       struct ib_device *device = r_xprt->rx_ia.ri_device;
+       unsigned int i;
+
+       for (i = 0; req->rl_nchunks; --req->rl_nchunks)
+               rpcrdma_unmap_one(device, &req->rl_segments[i++]);
+}
+
 static void
 physical_op_destroy(struct rpcrdma_buffer *buf)
 {
@@ -90,6 +102,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf)
 
 const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
        .ro_map                         = physical_op_map,
+       .ro_unmap_sync                  = physical_op_unmap_sync,
        .ro_unmap                       = physical_op_unmap,
        .ro_open                        = physical_op_open,
        .ro_maxpages                    = physical_op_maxpages,
index c10d9699441c3390d384e90f7dfc15ef9cd0ee27..0f28f2d743eda144dcd6cde7bd0a4fdb118bccc2 100644 (file)
@@ -804,6 +804,11 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
        if (req->rl_reply)
                goto out_duplicate;
 
+       /* Sanity checking has passed. We are now committed
+        * to complete this transaction.
+        */
+       list_del_init(&rqst->rq_list);
+       spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
                "                   RPC request 0x%p xid 0x%08x\n",
                        __func__, rep, req, rqst,
@@ -888,12 +893,23 @@ badheader:
                break;
        }
 
+       /* Invalidate and flush the data payloads before waking the
+        * waiting application. This guarantees the memory region is
+        * properly fenced from the server before the application
+        * accesses the data. It also ensures proper send flow
+        * control: waking the next RPC waits until this RPC has
+        * relinquished all its Send Queue entries.
+        */
+       if (req->rl_nchunks)
+               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
+
        credits = be32_to_cpu(headerp->rm_credit);
        if (credits == 0)
                credits = 1;    /* don't deadlock */
        else if (credits > r_xprt->rx_buf.rb_max_requests)
                credits = r_xprt->rx_buf.rb_max_requests;
 
+       spin_lock_bh(&xprt->transport_lock);
        cwnd = xprt->cwnd;
        xprt->cwnd = credits << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
index 8c545f7d75257ef72135f8dec24a91ad86fbbc9f..740bddcf3488fe9c2e6db48f0b7967909a4caca0 100644 (file)
@@ -576,6 +576,9 @@ xprt_rdma_free(void *buffer)
 
        rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
        req = rb->rg_owner;
+       if (req->rl_backchannel)
+               return;
+
        r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
 
        dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
index eadd1655145a3bc5b81bdefb7015792fb3be566a..732c71ce5dcab6758da0fe7661b877bfd8fa77ad 100644 (file)
@@ -616,10 +616,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
        /* set trigger for requesting send completion */
        ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
-       if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
-               ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
-       else if (ep->rep_cqinit <= 2)
-               ep->rep_cqinit = 0;
+       if (ep->rep_cqinit <= 2)
+               ep->rep_cqinit = 0;     /* always signal? */
        INIT_CQCOUNT(ep);
        init_waitqueue_head(&ep->rep_connect_wait);
        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
@@ -852,10 +850,11 @@ retry:
 
                if (extras) {
                        rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
-                       if (rc)
+                       if (rc) {
                                pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
                                        __func__, rc);
                                rc = 0;
+                       }
                }
        }
 
@@ -1337,15 +1336,14 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        struct rpcrdma_rep *rep;
-       unsigned long flags;
        int rc;
 
        while (count--) {
-               spin_lock_irqsave(&buffers->rb_lock, flags);
+               spin_lock(&buffers->rb_lock);
                if (list_empty(&buffers->rb_recv_bufs))
                        goto out_reqbuf;
                rep = rpcrdma_buffer_get_rep_locked(buffers);
-               spin_unlock_irqrestore(&buffers->rb_lock, flags);
+               spin_unlock(&buffers->rb_lock);
 
                rc = rpcrdma_ep_post_recv(ia, ep, rep);
                if (rc)
@@ -1355,7 +1353,7 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
        return 0;
 
 out_reqbuf:
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       spin_unlock(&buffers->rb_lock);
        pr_warn("%s: no extra receive buffers\n", __func__);
        return -ENOMEM;
 
index ac7f8d4f632a9e923fdcf3fdbd8f628ad044d34b..728101ddc44bfc4b3b6247c8d13f3222cb0f1386 100644 (file)
@@ -88,12 +88,6 @@ struct rpcrdma_ep {
        struct delayed_work     rep_connect_worker;
 };
 
-/*
- * Force a signaled SEND Work Request every so often,
- * in case the provider needs to do some housekeeping.
- */
-#define RPCRDMA_MAX_UNSIGNALED_SENDS   (32)
-
 #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
 #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
 
@@ -207,6 +201,12 @@ struct rpcrdma_frmr {
        enum rpcrdma_frmr_state         fr_state;
        struct work_struct              fr_work;
        struct rpcrdma_xprt             *fr_xprt;
+       bool                            fr_waiter;
+       struct completion               fr_linv_done;;
+       union {
+               struct ib_reg_wr        fr_regwr;
+               struct ib_send_wr       fr_invwr;
+       };
 };
 
 struct rpcrdma_fmr {
@@ -364,6 +364,8 @@ struct rpcrdma_xprt;
 struct rpcrdma_memreg_ops {
        int             (*ro_map)(struct rpcrdma_xprt *,
                                  struct rpcrdma_mr_seg *, int, bool);
+       void            (*ro_unmap_sync)(struct rpcrdma_xprt *,
+                                        struct rpcrdma_req *);
        int             (*ro_unmap)(struct rpcrdma_xprt *,
                                    struct rpcrdma_mr_seg *);
        int             (*ro_open)(struct rpcrdma_ia *,