]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - net/sunrpc/xprtrdma/verbs.c
xprtrdma: Reset FRMRs after a flushed LOCAL_INV Work Request
[karo-tx-linux.git] / net / sunrpc / xprtrdma / verbs.c
index 13dbd1c389ff07b02c6fa362ebbbf5fd5b6662d7..ca55acf423650e2338c4ce1773620704c12b21df 100644 (file)
@@ -61,6 +61,8 @@
 # define RPCDBG_FACILITY       RPCDBG_TRANS
 #endif
 
+static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
+
 /*
  * internal functions
  */
@@ -152,13 +154,15 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 
        if (wc->wr_id == 0ULL)
                return;
-       if (wc->status != IB_WC_SUCCESS)
+       if (wc->status != IB_WC_SUCCESS) {
+               frmr->r.frmr.fr_state = FRMR_IS_STALE;
                return;
+       }
 
        if (wc->opcode == IB_WC_FAST_REG_MR)
-               frmr->r.frmr.state = FRMR_IS_VALID;
+               frmr->r.frmr.fr_state = FRMR_IS_VALID;
        else if (wc->opcode == IB_WC_LOCAL_INV)
-               frmr->r.frmr.state = FRMR_IS_INVALID;
+               frmr->r.frmr.fr_state = FRMR_IS_INVALID;
 }
 
 static int
@@ -310,6 +314,13 @@ rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
        rpcrdma_recvcq_poll(cq, ep);
 }
 
+static void
+rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
+{
+       rpcrdma_recvcq_upcall(ep->rep_attr.recv_cq, ep);
+       rpcrdma_sendcq_upcall(ep->rep_attr.send_cq, ep);
+}
+
 #ifdef RPC_DEBUG
 static const char * const conn[] = {
        "address resolved",
@@ -613,6 +624,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
        /* Else will do memory reg/dereg for each chunk */
        ia->ri_memreg_strategy = memreg;
 
+       rwlock_init(&ia->ri_qplock);
        return 0;
 out2:
        rdma_destroy_id(ia->ri_id);
@@ -859,7 +871,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 int
 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
-       struct rdma_cm_id *id;
+       struct rdma_cm_id *id, *old;
        int rc = 0;
        int retry_count = 0;
 
@@ -871,9 +883,10 @@ retry:
                if (rc && rc != -ENOTCONN)
                        dprintk("RPC:       %s: rpcrdma_ep_disconnect"
                                " status %i\n", __func__, rc);
+               rpcrdma_flush_cqs(ep);
 
-               rpcrdma_clean_cq(ep->rep_attr.recv_cq);
-               rpcrdma_clean_cq(ep->rep_attr.send_cq);
+               if (ia->ri_memreg_strategy == RPCRDMA_FRMR)
+                       rpcrdma_reset_frmrs(ia);
 
                xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
                id = rpcrdma_create_id(xprt, ia,
@@ -905,9 +918,14 @@ retry:
                        rc = -ENETUNREACH;
                        goto out;
                }
-               rdma_destroy_qp(ia->ri_id);
-               rdma_destroy_id(ia->ri_id);
+
+               write_lock(&ia->ri_qplock);
+               old = ia->ri_id;
                ia->ri_id = id;
+               write_unlock(&ia->ri_qplock);
+
+               rdma_destroy_qp(old);
+               rdma_destroy_id(old);
        } else {
                dprintk("RPC:       %s: connecting...\n", __func__);
                rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
@@ -979,8 +997,7 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
        int rc;
 
-       rpcrdma_clean_cq(ep->rep_attr.recv_cq);
-       rpcrdma_clean_cq(ep->rep_attr.send_cq);
+       rpcrdma_flush_cqs(ep);
        rc = rdma_disconnect(ia->ri_id);
        if (!rc) {
                /* returns without wait if not connected */
@@ -1064,6 +1081,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
        p += cdata->padding;
 
        INIT_LIST_HEAD(&buf->rb_mws);
+       INIT_LIST_HEAD(&buf->rb_all);
        r = (struct rpcrdma_mw *)p;
        switch (ia->ri_memreg_strategy) {
        case RPCRDMA_FRMR:
@@ -1088,6 +1106,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
                                ib_dereg_mr(r->r.frmr.fr_mr);
                                goto out;
                        }
+                       list_add(&r->mw_all, &buf->rb_all);
                        list_add(&r->mw_list, &buf->rb_mws);
                        ++r;
                }
@@ -1106,6 +1125,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
                                        " failed %i\n", __func__, rc);
                                goto out;
                        }
+                       list_add(&r->mw_all, &buf->rb_all);
                        list_add(&r->mw_list, &buf->rb_mws);
                        ++r;
                }
@@ -1215,6 +1235,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
        while (!list_empty(&buf->rb_mws)) {
                r = list_entry(buf->rb_mws.next,
                        struct rpcrdma_mw, mw_list);
+               list_del(&r->mw_all);
                list_del(&r->mw_list);
                switch (ia->ri_memreg_strategy) {
                case RPCRDMA_FRMR:
@@ -1242,6 +1263,206 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
        kfree(buf->rb_pool);
 }
 
+/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
+ * an unusable state. Find FRMRs in this state and dereg / reg
+ * each.  FRMRs that are VALID and attached to an rpcrdma_req are
+ * also torn down.
+ *
+ * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
+ *
+ * This is invoked only in the transport connect worker in order
+ * to serialize with rpcrdma_register_frmr_external().
+ */
+static void
+rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
+{
+       struct rpcrdma_xprt *r_xprt =
+                               container_of(ia, struct rpcrdma_xprt, rx_ia);
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct list_head *pos;
+       struct rpcrdma_mw *r;
+       int rc;
+
+       list_for_each(pos, &buf->rb_all) {
+               r = list_entry(pos, struct rpcrdma_mw, mw_all);
+
+               if (r->r.frmr.fr_state == FRMR_IS_INVALID)
+                       continue;
+
+               rc = ib_dereg_mr(r->r.frmr.fr_mr);
+               if (rc)
+                       dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
+                               __func__, rc);
+               ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
+
+               r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
+                                       ia->ri_max_frmr_depth);
+               if (IS_ERR(r->r.frmr.fr_mr)) {
+                       rc = PTR_ERR(r->r.frmr.fr_mr);
+                       dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
+                               " failed %i\n", __func__, rc);
+                       continue;
+               }
+               r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
+                                       ia->ri_id->device,
+                                       ia->ri_max_frmr_depth);
+               if (IS_ERR(r->r.frmr.fr_pgl)) {
+                       rc = PTR_ERR(r->r.frmr.fr_pgl);
+                       dprintk("RPC:       %s: "
+                               "ib_alloc_fast_reg_page_list "
+                               "failed %i\n", __func__, rc);
+
+                       ib_dereg_mr(r->r.frmr.fr_mr);
+                       continue;
+               }
+               r->r.frmr.fr_state = FRMR_IS_INVALID;
+       }
+}
+
+/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
+ * some req segments uninitialized.
+ */
+static void
+rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
+{
+       if (*mw) {
+               list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
+               *mw = NULL;
+       }
+}
+
+/* Cycle mw's back in reverse order, and "spin" them.
+ * This delays and scrambles reuse as much as possible.
+ */
+static void
+rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_mr_seg *seg = req->rl_segments;
+       struct rpcrdma_mr_seg *seg1 = seg;
+       int i;
+
+       for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
+               rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
+       rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
+}
+
+static void
+rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+       buf->rb_send_bufs[--buf->rb_send_index] = req;
+       req->rl_niovs = 0;
+       if (req->rl_reply) {
+               buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
+               req->rl_reply->rr_func = NULL;
+               req->rl_reply = NULL;
+       }
+}
+
+/* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
+ * Redo only the ib_post_send().
+ */
+static void
+rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
+{
+       struct rpcrdma_xprt *r_xprt =
+                               container_of(ia, struct rpcrdma_xprt, rx_ia);
+       struct ib_send_wr invalidate_wr, *bad_wr;
+       int rc;
+
+       dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
+
+       /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
+       r->r.frmr.fr_state = FRMR_IS_VALID;
+
+       memset(&invalidate_wr, 0, sizeof(invalidate_wr));
+       invalidate_wr.wr_id = (unsigned long)(void *)r;
+       invalidate_wr.opcode = IB_WR_LOCAL_INV;
+       invalidate_wr.send_flags = IB_SEND_SIGNALED;
+       invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
+       DECR_CQCOUNT(&r_xprt->rx_ep);
+
+       dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
+               __func__, r, r->r.frmr.fr_mr->rkey);
+
+       read_lock(&ia->ri_qplock);
+       rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
+       read_unlock(&ia->ri_qplock);
+       if (rc) {
+               /* Force rpcrdma_buffer_get() to retry */
+               r->r.frmr.fr_state = FRMR_IS_STALE;
+               dprintk("RPC:       %s: ib_post_send failed, %i\n",
+                       __func__, rc);
+       }
+}
+
+static void
+rpcrdma_retry_flushed_linv(struct list_head *stale,
+                          struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+       struct list_head *pos;
+       struct rpcrdma_mw *r;
+       unsigned long flags;
+
+       list_for_each(pos, stale) {
+               r = list_entry(pos, struct rpcrdma_mw, mw_list);
+               rpcrdma_retry_local_inv(r, ia);
+       }
+
+       spin_lock_irqsave(&buf->rb_lock, flags);
+       list_splice_tail(stale, &buf->rb_mws);
+       spin_unlock_irqrestore(&buf->rb_lock, flags);
+}
+
+static struct rpcrdma_req *
+rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
+                        struct list_head *stale)
+{
+       struct rpcrdma_mw *r;
+       int i;
+
+       i = RPCRDMA_MAX_SEGS - 1;
+       while (!list_empty(&buf->rb_mws)) {
+               r = list_entry(buf->rb_mws.next,
+                              struct rpcrdma_mw, mw_list);
+               list_del(&r->mw_list);
+               if (r->r.frmr.fr_state == FRMR_IS_STALE) {
+                       list_add(&r->mw_list, stale);
+                       continue;
+               }
+               req->rl_segments[i].mr_chunk.rl_mw = r;
+               if (unlikely(i-- == 0))
+                       return req;     /* Success */
+       }
+
+       /* Not enough entries on rb_mws for this req */
+       rpcrdma_buffer_put_sendbuf(req, buf);
+       rpcrdma_buffer_put_mrs(req, buf);
+       return NULL;
+}
+
+static struct rpcrdma_req *
+rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_mw *r;
+       int i;
+
+       i = RPCRDMA_MAX_SEGS - 1;
+       while (!list_empty(&buf->rb_mws)) {
+               r = list_entry(buf->rb_mws.next,
+                              struct rpcrdma_mw, mw_list);
+               list_del(&r->mw_list);
+               req->rl_segments[i].mr_chunk.rl_mw = r;
+               if (unlikely(i-- == 0))
+                       return req;     /* Success */
+       }
+
+       /* Not enough entries on rb_mws for this req */
+       rpcrdma_buffer_put_sendbuf(req, buf);
+       rpcrdma_buffer_put_mrs(req, buf);
+       return NULL;
+}
+
 /*
  * Get a set of request/reply buffers.
  *
@@ -1254,10 +1475,10 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 {
+       struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
+       struct list_head stale;
        struct rpcrdma_req *req;
        unsigned long flags;
-       int i;
-       struct rpcrdma_mw *r;
 
        spin_lock_irqsave(&buffers->rb_lock, flags);
        if (buffers->rb_send_index == buffers->rb_max_requests) {
@@ -1277,16 +1498,21 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
                buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
        }
        buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
-       if (!list_empty(&buffers->rb_mws)) {
-               i = RPCRDMA_MAX_SEGS - 1;
-               do {
-                       r = list_entry(buffers->rb_mws.next,
-                                       struct rpcrdma_mw, mw_list);
-                       list_del(&r->mw_list);
-                       req->rl_segments[i].mr_chunk.rl_mw = r;
-               } while (--i >= 0);
+
+       INIT_LIST_HEAD(&stale);
+       switch (ia->ri_memreg_strategy) {
+       case RPCRDMA_FRMR:
+               req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
+               break;
+       case RPCRDMA_MTHCAFMR:
+               req = rpcrdma_buffer_get_fmrs(req, buffers);
+               break;
+       default:
+               break;
        }
        spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       if (!list_empty(&stale))
+               rpcrdma_retry_flushed_linv(&stale, buffers);
        return req;
 }
 
@@ -1299,34 +1525,14 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
 {
        struct rpcrdma_buffer *buffers = req->rl_buffer;
        struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
-       int i;
        unsigned long flags;
 
        spin_lock_irqsave(&buffers->rb_lock, flags);
-       buffers->rb_send_bufs[--buffers->rb_send_index] = req;
-       req->rl_niovs = 0;
-       if (req->rl_reply) {
-               buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
-               req->rl_reply->rr_func = NULL;
-               req->rl_reply = NULL;
-       }
+       rpcrdma_buffer_put_sendbuf(req, buffers);
        switch (ia->ri_memreg_strategy) {
        case RPCRDMA_FRMR:
        case RPCRDMA_MTHCAFMR:
-               /*
-                * Cycle mw's back in reverse order, and "spin" them.
-                * This delays and scrambles reuse as much as possible.
-                */
-               i = 1;
-               do {
-                       struct rpcrdma_mw **mw;
-                       mw = &req->rl_segments[i].mr_chunk.rl_mw;
-                       list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
-                       *mw = NULL;
-               } while (++i < RPCRDMA_MAX_SEGS);
-               list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
-                                       &buffers->rb_mws);
-               req->rl_segments[0].mr_chunk.rl_mw = NULL;
+               rpcrdma_buffer_put_mrs(req, buffers);
                break;
        default:
                break;
@@ -1388,6 +1594,9 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
         */
        iov->addr = ib_dma_map_single(ia->ri_id->device,
                        va, len, DMA_BIDIRECTIONAL);
+       if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
+               return -ENOMEM;
+
        iov->length = len;
 
        if (ia->ri_have_dma_lkey) {
@@ -1483,6 +1692,9 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
                        struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_mr_seg *seg1 = seg;
+       struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
+       struct rpcrdma_frmr *frmr = &mw->r.frmr;
+       struct ib_mr *mr = frmr->fr_mr;
        struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
 
        u8 key;
@@ -1502,8 +1714,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
                rpcrdma_map_one(ia, seg, writing);
                pa = seg->mr_dma;
                for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
-                       seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
-                               page_list[page_no++] = pa;
+                       frmr->fr_pgl->page_list[page_no++] = pa;
                        pa += PAGE_SIZE;
                }
                len += seg->mr_len;
@@ -1515,20 +1726,18 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
                        break;
        }
        dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
-               __func__, seg1->mr_chunk.rl_mw, i);
+               __func__, mw, i);
 
-       if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
+       if (unlikely(frmr->fr_state != FRMR_IS_INVALID)) {
                dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
-                       __func__,
-                       seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
+                       __func__, mr->rkey);
                /* Invalidate before using. */
                memset(&invalidate_wr, 0, sizeof invalidate_wr);
-               invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
+               invalidate_wr.wr_id = (unsigned long)(void *)mw;
                invalidate_wr.next = &frmr_wr;
                invalidate_wr.opcode = IB_WR_LOCAL_INV;
                invalidate_wr.send_flags = IB_SEND_SIGNALED;
-               invalidate_wr.ex.invalidate_rkey =
-                       seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+               invalidate_wr.ex.invalidate_rkey = mr->rkey;
                DECR_CQCOUNT(&r_xprt->rx_ep);
                post_wr = &invalidate_wr;
        } else
@@ -1536,28 +1745,27 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
 
        /* Prepare FRMR WR */
        memset(&frmr_wr, 0, sizeof frmr_wr);
-       frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
+       frmr_wr.wr_id = (unsigned long)(void *)mw;
        frmr_wr.opcode = IB_WR_FAST_REG_MR;
        frmr_wr.send_flags = IB_SEND_SIGNALED;
        frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
-       frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
+       frmr_wr.wr.fast_reg.page_list = frmr->fr_pgl;
        frmr_wr.wr.fast_reg.page_list_len = page_no;
        frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
        frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
        if (frmr_wr.wr.fast_reg.length < len) {
-               while (seg1->mr_nsegs--)
-                       rpcrdma_unmap_one(ia, seg++);
-               return -EIO;
+               rc = -EIO;
+               goto out_err;
        }
 
        /* Bump the key */
-       key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
-       ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
+       key = (u8)(mr->rkey & 0x000000FF);
+       ib_update_fast_reg_key(mr, ++key);
 
        frmr_wr.wr.fast_reg.access_flags = (writing ?
                                IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
                                IB_ACCESS_REMOTE_READ);
-       frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+       frmr_wr.wr.fast_reg.rkey = mr->rkey;
        DECR_CQCOUNT(&r_xprt->rx_ep);
 
        rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
@@ -1565,15 +1773,19 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
        if (rc) {
                dprintk("RPC:       %s: failed ib_post_send for register,"
                        " status %i\n", __func__, rc);
-               while (i--)
-                       rpcrdma_unmap_one(ia, --seg);
+               ib_update_fast_reg_key(mr, --key);
+               goto out_err;
        } else {
-               seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+               seg1->mr_rkey = mr->rkey;
                seg1->mr_base = seg1->mr_dma + pageoff;
                seg1->mr_nsegs = i;
                seg1->mr_len = len;
        }
        *nsegs = i;
+       return 0;
+out_err:
+       while (i--)
+               rpcrdma_unmap_one(ia, --seg);
        return rc;
 }
 
@@ -1585,9 +1797,6 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
        struct ib_send_wr invalidate_wr, *bad_wr;
        int rc;
 
-       while (seg1->mr_nsegs--)
-               rpcrdma_unmap_one(ia, seg++);
-
        memset(&invalidate_wr, 0, sizeof invalidate_wr);
        invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
        invalidate_wr.opcode = IB_WR_LOCAL_INV;
@@ -1595,7 +1804,11 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
        invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
        DECR_CQCOUNT(&r_xprt->rx_ep);
 
+       read_lock(&ia->ri_qplock);
+       while (seg1->mr_nsegs--)
+               rpcrdma_unmap_one(ia, seg++);
        rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
+       read_unlock(&ia->ri_qplock);
        if (rc)
                dprintk("RPC:       %s: failed ib_post_send for invalidate,"
                        " status %i\n", __func__, rc);
@@ -1656,8 +1869,10 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
 
        list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
        rc = ib_unmap_fmr(&l);
+       read_lock(&ia->ri_qplock);
        while (seg1->mr_nsegs--)
                rpcrdma_unmap_one(ia, seg++);
+       read_unlock(&ia->ri_qplock);
        if (rc)
                dprintk("RPC:       %s: failed ib_unmap_fmr,"
                        " status %i\n", __func__, rc);
@@ -1713,7 +1928,9 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
 
 #if RPCRDMA_PERSISTENT_REGISTRATION
        case RPCRDMA_ALLPHYSICAL:
+               read_lock(&ia->ri_qplock);
                rpcrdma_unmap_one(ia, seg);
+               read_unlock(&ia->ri_qplock);
                break;
 #endif
 
@@ -1809,3 +2026,44 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
                        rc);
        return rc;
 }
+
+/* Physical mapping means one Read/Write list entry per-page.
+ * All list entries must fit within an inline buffer
+ *
+ * NB: The server must return a Write list for NFS READ,
+ *     which has the same constraint. Factor in the inline
+ *     rsize as well.
+ */
+static size_t
+rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+       unsigned int inline_size, pages;
+
+       inline_size = min_t(unsigned int,
+                           cdata->inline_wsize, cdata->inline_rsize);
+       inline_size -= RPCRDMA_HDRLEN_MIN;
+       pages = inline_size / sizeof(struct rpcrdma_segment);
+       return pages << PAGE_SHIFT;
+}
+
+static size_t
+rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
+{
+       return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
+}
+
+size_t
+rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
+{
+       size_t result;
+
+       switch (r_xprt->rx_ia.ri_memreg_strategy) {
+       case RPCRDMA_ALLPHYSICAL:
+               result = rpcrdma_physical_max_payload(r_xprt);
+               break;
+       default:
+               result = rpcrdma_mr_max_payload(r_xprt);
+       }
+       return result;
+}