]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
Merge tag 'nfs-rdma-for-3.20' of git://git.linux-nfs.org/projects/anna/nfs-rdma
authorTrond Myklebust <trond.myklebust@primarydata.com>
Tue, 3 Feb 2015 16:53:18 +0000 (11:53 -0500)
committerTrond Myklebust <trond.myklebust@primarydata.com>
Tue, 3 Feb 2015 16:54:58 +0000 (11:54 -0500)
NFS: Client side changes for RDMA

These patches improve the scalability of the NFSoRDMA client and take large
variables off of the stack.  Additionally, the GFP_* flags are updated to
match what TCP uses.

Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
* tag 'nfs-rdma-for-3.20' of git://git.linux-nfs.org/projects/anna/nfs-rdma: (21 commits)
  xprtrdma: Update the GFP flags used in xprt_rdma_allocate()
  xprtrdma: Clean up after adding regbuf management
  xprtrdma: Allocate zero pad separately from rpcrdma_buffer
  xprtrdma: Allocate RPC/RDMA receive buffer separately from struct rpcrdma_rep
  xprtrdma: Allocate RPC/RDMA send buffer separately from struct rpcrdma_req
  xprtrdma: Allocate RPC send buffer separately from struct rpcrdma_req
  xprtrdma: Add struct rpcrdma_regbuf and helpers
  xprtrdma: Refactor rpcrdma_buffer_create() and rpcrdma_buffer_destroy()
  xprtrdma: Simplify synopsis of rpcrdma_buffer_create()
  xprtrdma: Take struct ib_qp_attr and ib_qp_init_attr off the stack
  xprtrdma: Take struct ib_device_attr off the stack
  xprtrdma: Free the pd if ib_query_qp() fails
  xprtrdma: Remove rpcrdma_ep::rep_func and ::rep_xprt
  xprtrdma: Move credit update to RPC reply handler
  xprtrdma: Remove rl_mr field, and the mr_chunk union
  xprtrdma: Remove rpcrdma_ep::rep_ia
  xprtrdma: Rename "xprt" and "rdma_connect" fields in struct rpcrdma_xprt
  xprtrdma: Clean up hdrlen
  xprtrdma: Display XIDs in host byte order
  xprtrdma: Modernize htonl and ntohl
  ...

include/linux/sunrpc/rpc_rdma.h
include/linux/sunrpc/svc_rdma.h
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index b78f16b1dea3a0d6a38262e7f1dd3845efce22b6..f33c5a4d6fe47fddb2ae57e4eac448df6ff810c8 100644 (file)
@@ -42,6 +42,9 @@
 
 #include <linux/types.h>
 
+#define RPCRDMA_VERSION                1
+#define rpcrdma_version                cpu_to_be32(RPCRDMA_VERSION)
+
 struct rpcrdma_segment {
        __be32 rs_handle;       /* Registered memory handle */
        __be32 rs_length;       /* Length of the chunk in bytes */
@@ -95,7 +98,10 @@ struct rpcrdma_msg {
        } rm_body;
 };
 
-#define RPCRDMA_HDRLEN_MIN     28
+/*
+ * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
+ */
+#define RPCRDMA_HDRLEN_MIN     (sizeof(__be32) * 7)
 
 enum rpcrdma_errcode {
        ERR_VERS = 1,
@@ -115,4 +121,10 @@ enum rpcrdma_proc {
        RDMA_ERROR = 4          /* An RPC RDMA encoding error */
 };
 
+#define rdma_msg       cpu_to_be32(RDMA_MSG)
+#define rdma_nomsg     cpu_to_be32(RDMA_NOMSG)
+#define rdma_msgp      cpu_to_be32(RDMA_MSGP)
+#define rdma_done      cpu_to_be32(RDMA_DONE)
+#define rdma_error     cpu_to_be32(RDMA_ERROR)
+
 #endif                         /* _LINUX_SUNRPC_RPC_RDMA_H */
index 975da754c778d35921eee53cddb2913cf5ef698e..ddfe88f522199cefb328d2d07df319c4e07622c1 100644 (file)
@@ -63,8 +63,6 @@ extern atomic_t rdma_stat_rq_prod;
 extern atomic_t rdma_stat_sq_poll;
 extern atomic_t rdma_stat_sq_prod;
 
-#define RPCRDMA_VERSION 1
-
 /*
  * Contexts are built when an RDMA request is created and are a
  * record of the resources that can be recovered when the request
index df01d124936c32409f88e872e6e1b3c1bdb2e25c..7e9acd9361c55bae557fcc51681585434b1755ef 100644 (file)
@@ -209,9 +209,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
                if (cur_rchunk) {       /* read */
                        cur_rchunk->rc_discrim = xdr_one;
                        /* all read chunks have the same "position" */
-                       cur_rchunk->rc_position = htonl(pos);
-                       cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
-                       cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
+                       cur_rchunk->rc_position = cpu_to_be32(pos);
+                       cur_rchunk->rc_target.rs_handle =
+                                               cpu_to_be32(seg->mr_rkey);
+                       cur_rchunk->rc_target.rs_length =
+                                               cpu_to_be32(seg->mr_len);
                        xdr_encode_hyper(
                                        (__be32 *)&cur_rchunk->rc_target.rs_offset,
                                        seg->mr_base);
@@ -222,8 +224,10 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
                        cur_rchunk++;
                        r_xprt->rx_stats.read_chunk_count++;
                } else {                /* write/reply */
-                       cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
-                       cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
+                       cur_wchunk->wc_target.rs_handle =
+                                               cpu_to_be32(seg->mr_rkey);
+                       cur_wchunk->wc_target.rs_length =
+                                               cpu_to_be32(seg->mr_len);
                        xdr_encode_hyper(
                                        (__be32 *)&cur_wchunk->wc_target.rs_offset,
                                        seg->mr_base);
@@ -257,7 +261,7 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
                *iptr++ = xdr_zero;     /* encode a NULL reply chunk */
        } else {
                warray->wc_discrim = xdr_one;
-               warray->wc_nchunks = htonl(nchunks);
+               warray->wc_nchunks = cpu_to_be32(nchunks);
                iptr = (__be32 *) cur_wchunk;
                if (type == rpcrdma_writech) {
                        *iptr++ = xdr_zero; /* finish the write chunk list */
@@ -290,7 +294,7 @@ ssize_t
 rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
 {
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-       struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
+       struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf);
 
        if (req->rl_rtype != rpcrdma_noch)
                result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
@@ -402,13 +406,12 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        base = rqst->rq_svec[0].iov_base;
        rpclen = rqst->rq_svec[0].iov_len;
 
-       /* build RDMA header in private area at front */
-       headerp = (struct rpcrdma_msg *) req->rl_base;
-       /* don't htonl XID, it's already done in request */
+       headerp = rdmab_to_msg(req->rl_rdmabuf);
+       /* don't byte-swap XID, it's already done in request */
        headerp->rm_xid = rqst->rq_xid;
-       headerp->rm_vers = xdr_one;
-       headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
-       headerp->rm_type = htonl(RDMA_MSG);
+       headerp->rm_vers = rpcrdma_version;
+       headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
+       headerp->rm_type = rdma_msg;
 
        /*
         * Chunks needed for results?
@@ -468,7 +471,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
                return -EIO;
        }
 
-       hdrlen = 28; /*sizeof *headerp;*/
+       hdrlen = RPCRDMA_HDRLEN_MIN;
        padlen = 0;
 
        /*
@@ -482,11 +485,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
                                                RPCRDMA_INLINE_PAD_VALUE(rqst));
 
                if (padlen) {
-                       headerp->rm_type = htonl(RDMA_MSGP);
+                       headerp->rm_type = rdma_msgp;
                        headerp->rm_body.rm_padded.rm_align =
-                               htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
+                               cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
                        headerp->rm_body.rm_padded.rm_thresh =
-                               htonl(RPCRDMA_INLINE_PAD_THRESH);
+                               cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
                        headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
                        headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
                        headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
@@ -524,7 +527,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
                " headerp 0x%p base 0x%p lkey 0x%x\n",
                __func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
-               headerp, base, req->rl_iov.lkey);
+               headerp, base, rdmab_lkey(req->rl_rdmabuf));
 
        /*
         * initialize send_iov's - normally only two: rdma chunk header and
@@ -533,26 +536,26 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
         * header and any write data. In all non-rdma cases, any following
         * data has been copied into the RPC header buffer.
         */
-       req->rl_send_iov[0].addr = req->rl_iov.addr;
+       req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
        req->rl_send_iov[0].length = hdrlen;
-       req->rl_send_iov[0].lkey = req->rl_iov.lkey;
+       req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
 
-       req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
+       req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
        req->rl_send_iov[1].length = rpclen;
-       req->rl_send_iov[1].lkey = req->rl_iov.lkey;
+       req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
 
        req->rl_niovs = 2;
 
        if (padlen) {
                struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 
-               req->rl_send_iov[2].addr = ep->rep_pad.addr;
+               req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
                req->rl_send_iov[2].length = padlen;
-               req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
+               req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);
 
                req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
                req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
-               req->rl_send_iov[3].lkey = req->rl_iov.lkey;
+               req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
 
                req->rl_niovs = 4;
        }
@@ -569,8 +572,9 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
 {
        unsigned int i, total_len;
        struct rpcrdma_write_chunk *cur_wchunk;
+       char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
 
-       i = ntohl(**iptrp);     /* get array count */
+       i = be32_to_cpu(**iptrp);
        if (i > max)
                return -1;
        cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
@@ -582,11 +586,11 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
                        xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
                        dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
                                __func__,
-                               ntohl(seg->rs_length),
+                               be32_to_cpu(seg->rs_length),
                                (unsigned long long)off,
-                               ntohl(seg->rs_handle));
+                               be32_to_cpu(seg->rs_handle));
                }
-               total_len += ntohl(seg->rs_length);
+               total_len += be32_to_cpu(seg->rs_length);
                ++cur_wchunk;
        }
        /* check and adjust for properly terminated write chunk */
@@ -596,7 +600,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
                        return -1;
                cur_wchunk = (struct rpcrdma_write_chunk *) w;
        }
-       if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
+       if ((char *)cur_wchunk > base + rep->rr_len)
                return -1;
 
        *iptrp = (__be32 *) cur_wchunk;
@@ -691,7 +695,9 @@ rpcrdma_connect_worker(struct work_struct *work)
 {
        struct rpcrdma_ep *ep =
                container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
-       struct rpc_xprt *xprt = ep->rep_xprt;
+       struct rpcrdma_xprt *r_xprt =
+               container_of(ep, struct rpcrdma_xprt, rx_ep);
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 
        spin_lock_bh(&xprt->transport_lock);
        if (++xprt->connect_cookie == 0)        /* maintain a reserved value */
@@ -732,7 +738,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
        struct rpc_xprt *xprt = rep->rr_xprt;
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
        __be32 *iptr;
-       int rdmalen, status;
+       int credits, rdmalen, status;
        unsigned long cwnd;
 
        /* Check status. If bad, signal disconnect and return rep to pool */
@@ -744,14 +750,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
                }
                return;
        }
-       if (rep->rr_len < 28) {
+       if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
                dprintk("RPC:       %s: short/invalid reply\n", __func__);
                goto repost;
        }
-       headerp = (struct rpcrdma_msg *) rep->rr_base;
-       if (headerp->rm_vers != xdr_one) {
+       headerp = rdmab_to_msg(rep->rr_rdmabuf);
+       if (headerp->rm_vers != rpcrdma_version) {
                dprintk("RPC:       %s: invalid version %d\n",
-                       __func__, ntohl(headerp->rm_vers));
+                       __func__, be32_to_cpu(headerp->rm_vers));
                goto repost;
        }
 
@@ -762,7 +768,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
                spin_unlock(&xprt->transport_lock);
                dprintk("RPC:       %s: reply 0x%p failed "
                        "to match any request xid 0x%08x len %d\n",
-                       __func__, rep, headerp->rm_xid, rep->rr_len);
+                       __func__, rep, be32_to_cpu(headerp->rm_xid),
+                       rep->rr_len);
 repost:
                r_xprt->rx_stats.bad_reply_count++;
                rep->rr_func = rpcrdma_reply_handler;
@@ -778,13 +785,14 @@ repost:
                spin_unlock(&xprt->transport_lock);
                dprintk("RPC:       %s: duplicate reply 0x%p to RPC "
                        "request 0x%p: xid 0x%08x\n", __func__, rep, req,
-                       headerp->rm_xid);
+                       be32_to_cpu(headerp->rm_xid));
                goto repost;
        }
 
        dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
                "                   RPC request 0x%p xid 0x%08x\n",
-                       __func__, rep, req, rqst, headerp->rm_xid);
+                       __func__, rep, req, rqst,
+                       be32_to_cpu(headerp->rm_xid));
 
        /* from here on, the reply is no longer an orphan */
        req->rl_reply = rep;
@@ -793,7 +801,7 @@ repost:
        /* check for expected message types */
        /* The order of some of these tests is important. */
        switch (headerp->rm_type) {
-       case htonl(RDMA_MSG):
+       case rdma_msg:
                /* never expect read chunks */
                /* never expect reply chunks (two ways to check) */
                /* never expect write chunks without having offered RDMA */
@@ -824,22 +832,24 @@ repost:
                } else {
                        /* else ordinary inline */
                        rdmalen = 0;
-                       iptr = (__be32 *)((unsigned char *)headerp + 28);
-                       rep->rr_len -= 28; /*sizeof *headerp;*/
+                       iptr = (__be32 *)((unsigned char *)headerp +
+                                                       RPCRDMA_HDRLEN_MIN);
+                       rep->rr_len -= RPCRDMA_HDRLEN_MIN;
                        status = rep->rr_len;
                }
                /* Fix up the rpc results for upper layer */
                rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
                break;
 
-       case htonl(RDMA_NOMSG):
+       case rdma_nomsg:
                /* never expect read or write chunks, always reply chunks */
                if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
                    headerp->rm_body.rm_chunks[1] != xdr_zero ||
                    headerp->rm_body.rm_chunks[2] != xdr_one ||
                    req->rl_nchunks == 0)
                        goto badheader;
-               iptr = (__be32 *)((unsigned char *)headerp + 28);
+               iptr = (__be32 *)((unsigned char *)headerp +
+                                                       RPCRDMA_HDRLEN_MIN);
                rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
                if (rdmalen < 0)
                        goto badheader;
@@ -853,7 +863,7 @@ badheader:
                dprintk("%s: invalid rpcrdma reply header (type %d):"
                                " chunks[012] == %d %d %d"
                                " expected chunks <= %d\n",
-                               __func__, ntohl(headerp->rm_type),
+                               __func__, be32_to_cpu(headerp->rm_type),
                                headerp->rm_body.rm_chunks[0],
                                headerp->rm_body.rm_chunks[1],
                                headerp->rm_body.rm_chunks[2],
@@ -863,8 +873,14 @@ badheader:
                break;
        }
 
+       credits = be32_to_cpu(headerp->rm_credit);
+       if (credits == 0)
+               credits = 1;    /* don't deadlock */
+       else if (credits > r_xprt->rx_buf.rb_max_requests)
+               credits = r_xprt->rx_buf.rb_max_requests;
+
        cwnd = xprt->cwnd;
-       xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
+       xprt->cwnd = credits << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
                xprt_release_rqst_cong(rqst->rq_task);
 
index bbd6155d3e3454fa04ca6dcf129b866e44857b6d..2e192baa59f3d83841746a4ea5769d8680a5f039 100644 (file)
@@ -200,9 +200,9 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt)
 static void
 xprt_rdma_connect_worker(struct work_struct *work)
 {
-       struct rpcrdma_xprt *r_xprt =
-               container_of(work, struct rpcrdma_xprt, rdma_connect.work);
-       struct rpc_xprt *xprt = &r_xprt->xprt;
+       struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
+                                                  rx_connect_worker.work);
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        int rc = 0;
 
        xprt_clear_connected(xprt);
@@ -235,7 +235,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
 
        dprintk("RPC:       %s: called\n", __func__);
 
-       cancel_delayed_work_sync(&r_xprt->rdma_connect);
+       cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 
        xprt_clear_connected(xprt);
 
@@ -364,8 +364,7 @@ xprt_setup_rdma(struct xprt_create *args)
         * any inline data. Also specify any padding which will be provided
         * from a preregistered zero buffer.
         */
-       rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
-                               &new_xprt->rx_data);
+       rc = rpcrdma_buffer_create(new_xprt);
        if (rc)
                goto out3;
 
@@ -374,9 +373,8 @@ xprt_setup_rdma(struct xprt_create *args)
         * connection loss notification is async. We also catch connection loss
         * when reaping receives.
         */
-       INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
-       new_ep->rep_func = rpcrdma_conn_func;
-       new_ep->rep_xprt = xprt;
+       INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
+                         xprt_rdma_connect_worker);
 
        xprt_rdma_format_addresses(xprt);
        xprt->max_payload = rpcrdma_max_payload(new_xprt);
@@ -434,94 +432,101 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 
        if (r_xprt->rx_ep.rep_connected != 0) {
                /* Reconnect */
-               schedule_delayed_work(&r_xprt->rdma_connect,
-                       xprt->reestablish_timeout);
+               schedule_delayed_work(&r_xprt->rx_connect_worker,
+                                     xprt->reestablish_timeout);
                xprt->reestablish_timeout <<= 1;
                if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
                        xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
                else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
                        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
        } else {
-               schedule_delayed_work(&r_xprt->rdma_connect, 0);
+               schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
                if (!RPC_IS_ASYNC(task))
-                       flush_delayed_work(&r_xprt->rdma_connect);
+                       flush_delayed_work(&r_xprt->rx_connect_worker);
        }
 }
 
 /*
  * The RDMA allocate/free functions need the task structure as a place
  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence. For this reason, the recv buffers are attached to send
- * buffers for portions of the RPC. Note that the RPC layer allocates
- * both send and receive buffers in the same call. We may register
- * the receive buffer portion when using reply chunks.
+ * sequence.
+ *
+ * The RPC layer allocates both send and receive buffers in the same call
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
+ * We may register rq_rcv_buf when using reply chunks.
  */
 static void *
 xprt_rdma_allocate(struct rpc_task *task, size_t size)
 {
        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
-       struct rpcrdma_req *req, *nreq;
+       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+       struct rpcrdma_regbuf *rb;
+       struct rpcrdma_req *req;
+       size_t min_size;
+       gfp_t flags;
 
-       req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+       req = rpcrdma_buffer_get(&r_xprt->rx_buf);
        if (req == NULL)
                return NULL;
 
-       if (size > req->rl_size) {
-               dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
-                       "prog %d vers %d proc %d\n",
-                       __func__, size, req->rl_size,
-                       task->tk_client->cl_prog, task->tk_client->cl_vers,
-                       task->tk_msg.rpc_proc->p_proc);
-               /*
-                * Outgoing length shortage. Our inline write max must have
-                * been configured to perform direct i/o.
-                *
-                * This is therefore a large metadata operation, and the
-                * allocate call was made on the maximum possible message,
-                * e.g. containing long filename(s) or symlink data. In
-                * fact, while these metadata operations *might* carry
-                * large outgoing payloads, they rarely *do*. However, we
-                * have to commit to the request here, so reallocate and
-                * register it now. The data path will never require this
-                * reallocation.
-                *
-                * If the allocation or registration fails, the RPC framework
-                * will (doggedly) retry.
-                */
-               if (task->tk_flags & RPC_TASK_SWAPPER)
-                       nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
-               else
-                       nreq = kmalloc(sizeof *req + size, GFP_NOFS);
-               if (nreq == NULL)
-                       goto outfail;
-
-               if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
-                               nreq->rl_base, size + sizeof(struct rpcrdma_req)
-                               - offsetof(struct rpcrdma_req, rl_base),
-                               &nreq->rl_handle, &nreq->rl_iov)) {
-                       kfree(nreq);
-                       goto outfail;
-               }
-               rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
-               nreq->rl_size = size;
-               nreq->rl_niovs = 0;
-               nreq->rl_nchunks = 0;
-               nreq->rl_buffer = (struct rpcrdma_buffer *)req;
-               nreq->rl_reply = req->rl_reply;
-               memcpy(nreq->rl_segments,
-                       req->rl_segments, sizeof nreq->rl_segments);
-               /* flag the swap with an unused field */
-               nreq->rl_iov.length = 0;
-               req->rl_reply = NULL;
-               req = nreq;
-       }
+       flags = GFP_NOIO | __GFP_NOWARN;
+       if (RPC_IS_SWAPPER(task))
+               flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
+
+       if (req->rl_rdmabuf == NULL)
+               goto out_rdmabuf;
+       if (req->rl_sendbuf == NULL)
+               goto out_sendbuf;
+       if (size > req->rl_sendbuf->rg_size)
+               goto out_sendbuf;
+
+out:
        dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
        req->rl_connect_cookie = 0;     /* our reserved value */
-       return req->rl_xdr_buf;
-
-outfail:
+       return req->rl_sendbuf->rg_base;
+
+out_rdmabuf:
+       min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+       rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+       if (IS_ERR(rb))
+               goto out_fail;
+       req->rl_rdmabuf = rb;
+
+out_sendbuf:
+       /* XDR encoding and RPC/RDMA marshaling of this request has not
+        * yet occurred. Thus a lower bound is needed to prevent buffer
+        * overrun during marshaling.
+        *
+        * RPC/RDMA marshaling may choose to send payload bearing ops
+        * inline, if the result is smaller than the inline threshold.
+        * The value of the "size" argument accounts for header
+        * requirements but not for the payload in these cases.
+        *
+        * Likewise, allocate enough space to receive a reply up to the
+        * size of the inline threshold.
+        *
+        * It's unlikely that both the send header and the received
+        * reply will be large, but slush is provided here to allow
+        * flexibility when marshaling.
+        */
+       min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
+       min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+       if (size < min_size)
+               size = min_size;
+
+       rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+       if (IS_ERR(rb))
+               goto out_fail;
+       rb->rg_owner = req;
+
+       r_xprt->rx_stats.hardway_register_count += size;
+       rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+       req->rl_sendbuf = rb;
+       goto out;
+
+out_fail:
        rpcrdma_buffer_put(req);
-       rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+       r_xprt->rx_stats.failed_marshal_count++;
        return NULL;
 }
 
@@ -533,47 +538,24 @@ xprt_rdma_free(void *buffer)
 {
        struct rpcrdma_req *req;
        struct rpcrdma_xprt *r_xprt;
-       struct rpcrdma_rep *rep;
+       struct rpcrdma_regbuf *rb;
        int i;
 
        if (buffer == NULL)
                return;
 
-       req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
-       if (req->rl_iov.length == 0) {  /* see allocate above */
-               r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
-                                     struct rpcrdma_xprt, rx_buf);
-       } else
-               r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
-       rep = req->rl_reply;
+       rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
+       req = rb->rg_owner;
+       r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
 
-       dprintk("RPC:       %s: called on 0x%p%s\n",
-               __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+       dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-       /*
-        * Finish the deregistration.  The process is considered
-        * complete when the rr_func vector becomes NULL - this
-        * was put in place during rpcrdma_reply_handler() - the wait
-        * call below will not block if the dereg is "done". If
-        * interrupted, our framework will clean up.
-        */
        for (i = 0; req->rl_nchunks;) {
                --req->rl_nchunks;
                i += rpcrdma_deregister_external(
                        &req->rl_segments[i], r_xprt);
        }
 
-       if (req->rl_iov.length == 0) {  /* see allocate above */
-               struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
-               oreq->rl_reply = req->rl_reply;
-               (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
-                                                  req->rl_handle,
-                                                  &req->rl_iov);
-               kfree(req);
-               req = oreq;
-       }
-
-       /* Put back request+reply buffers */
        rpcrdma_buffer_put(req);
 }
 
index c98e40643910326abf13ecf70d3f570289175006..124676c1378089e807c3d330da1bd35a465f49ad 100644 (file)
@@ -49,6 +49,7 @@
 
 #include <linux/interrupt.h>
 #include <linux/slab.h>
+#include <linux/prefetch.h>
 #include <asm/bitops.h>
 
 #include "xprt_rdma.h"
@@ -153,7 +154,7 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
                event->device->name, context);
        if (ep->rep_connected == 1) {
                ep->rep_connected = -EIO;
-               ep->rep_func(ep);
+               rpcrdma_conn_func(ep);
                wake_up_all(&ep->rep_connect_wait);
        }
 }
@@ -168,23 +169,59 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
                event->device->name, context);
        if (ep->rep_connected == 1) {
                ep->rep_connected = -EIO;
-               ep->rep_func(ep);
+               rpcrdma_conn_func(ep);
                wake_up_all(&ep->rep_connect_wait);
        }
 }
 
+static const char * const wc_status[] = {
+       "success",
+       "local length error",
+       "local QP operation error",
+       "local EE context operation error",
+       "local protection error",
+       "WR flushed",
+       "memory management operation error",
+       "bad response error",
+       "local access error",
+       "remote invalid request error",
+       "remote access error",
+       "remote operation error",
+       "transport retry counter exceeded",
+       "RNR retrycounter exceeded",
+       "local RDD violation error",
+       "remove invalid RD request",
+       "operation aborted",
+       "invalid EE context number",
+       "invalid EE context state",
+       "fatal error",
+       "response timeout error",
+       "general error",
+};
+
+#define COMPLETION_MSG(status)                                 \
+       ((status) < ARRAY_SIZE(wc_status) ?                     \
+               wc_status[(status)] : "unexpected completion error")
+
 static void
 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 {
-       struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+       if (likely(wc->status == IB_WC_SUCCESS))
+               return;
 
-       dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
-               __func__, frmr, wc->status, wc->opcode);
+       /* WARNING: Only wr_id and status are reliable at this point */
+       if (wc->wr_id == 0ULL) {
+               if (wc->status != IB_WC_WR_FLUSH_ERR)
+                       pr_err("RPC:       %s: SEND: %s\n",
+                              __func__, COMPLETION_MSG(wc->status));
+       } else {
+               struct rpcrdma_mw *r;
 
-       if (wc->wr_id == 0ULL)
-               return;
-       if (wc->status != IB_WC_SUCCESS)
-               frmr->r.frmr.fr_state = FRMR_IS_STALE;
+               r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+               r->r.frmr.fr_state = FRMR_IS_STALE;
+               pr_err("RPC:       %s: frmr %p (stale): %s\n",
+                      __func__, r, COMPLETION_MSG(wc->status));
+       }
 }
 
 static int
@@ -248,33 +285,32 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
        struct rpcrdma_rep *rep =
                        (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
 
-       dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
-               __func__, rep, wc->status, wc->opcode, wc->byte_len);
+       /* WARNING: Only wr_id and status are reliable at this point */
+       if (wc->status != IB_WC_SUCCESS)
+               goto out_fail;
 
-       if (wc->status != IB_WC_SUCCESS) {
-               rep->rr_len = ~0U;
-               goto out_schedule;
-       }
+       /* status == SUCCESS means all fields in wc are trustworthy */
        if (wc->opcode != IB_WC_RECV)
                return;
 
+       dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
+               __func__, rep, wc->byte_len);
+
        rep->rr_len = wc->byte_len;
        ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
-                       rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
-
-       if (rep->rr_len >= 16) {
-               struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
-               unsigned int credits = ntohl(p->rm_credit);
-
-               if (credits == 0)
-                       credits = 1;    /* don't deadlock */
-               else if (credits > rep->rr_buffer->rb_max_requests)
-                       credits = rep->rr_buffer->rb_max_requests;
-               atomic_set(&rep->rr_buffer->rb_credits, credits);
-       }
+                                  rdmab_addr(rep->rr_rdmabuf),
+                                  rep->rr_len, DMA_FROM_DEVICE);
+       prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 
 out_schedule:
        list_add_tail(&rep->rr_list, sched_list);
+       return;
+out_fail:
+       if (wc->status != IB_WC_WR_FLUSH_ERR)
+               pr_err("RPC:       %s: rep %p: %s\n",
+                      __func__, rep, COMPLETION_MSG(wc->status));
+       rep->rr_len = ~0U;
+       goto out_schedule;
 }
 
 static int
@@ -390,8 +426,8 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
        struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
 #endif
-       struct ib_qp_attr attr;
-       struct ib_qp_init_attr iattr;
+       struct ib_qp_attr *attr = &ia->ri_qp_attr;
+       struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
        int connstate = 0;
 
        switch (event->event) {
@@ -414,12 +450,13 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
                break;
        case RDMA_CM_EVENT_ESTABLISHED:
                connstate = 1;
-               ib_query_qp(ia->ri_id->qp, &attr,
-                       IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
-                       &iattr);
+               ib_query_qp(ia->ri_id->qp, attr,
+                           IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
+                           iattr);
                dprintk("RPC:       %s: %d responder resources"
                        " (%d initiator)\n",
-                       __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
+                       __func__, attr->max_dest_rd_atomic,
+                       attr->max_rd_atomic);
                goto connected;
        case RDMA_CM_EVENT_CONNECT_ERROR:
                connstate = -ENOTCONN;
@@ -436,11 +473,10 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
        case RDMA_CM_EVENT_DEVICE_REMOVAL:
                connstate = -ENODEV;
 connected:
-               atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
                dprintk("RPC:       %s: %sconnected\n",
                                        __func__, connstate > 0 ? "" : "dis");
                ep->rep_connected = connstate;
-               ep->rep_func(ep);
+               rpcrdma_conn_func(ep);
                wake_up_all(&ep->rep_connect_wait);
                /*FALLTHROUGH*/
        default:
@@ -453,7 +489,7 @@ connected:
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
        if (connstate == 1) {
-               int ird = attr.max_dest_rd_atomic;
+               int ird = attr->max_dest_rd_atomic;
                int tird = ep->rep_remote_cma.responder_resources;
                printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
                        "on %s, memreg %d slots %d ird %d%s\n",
@@ -554,8 +590,8 @@ int
 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 {
        int rc, mem_priv;
-       struct ib_device_attr devattr;
        struct rpcrdma_ia *ia = &xprt->rx_ia;
+       struct ib_device_attr *devattr = &ia->ri_devattr;
 
        ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
        if (IS_ERR(ia->ri_id)) {
@@ -571,26 +607,21 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
                goto out2;
        }
 
-       /*
-        * Query the device to determine if the requested memory
-        * registration strategy is supported. If it isn't, set the
-        * strategy to a globally supported model.
-        */
-       rc = ib_query_device(ia->ri_id->device, &devattr);
+       rc = ib_query_device(ia->ri_id->device, devattr);
        if (rc) {
                dprintk("RPC:       %s: ib_query_device failed %d\n",
                        __func__, rc);
-               goto out2;
+               goto out3;
        }
 
-       if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
+       if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
                ia->ri_have_dma_lkey = 1;
                ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
        }
 
        if (memreg == RPCRDMA_FRMR) {
                /* Requires both frmr reg and local dma lkey */
-               if ((devattr.device_cap_flags &
+               if ((devattr->device_cap_flags &
                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
                    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
                        dprintk("RPC:       %s: FRMR registration "
@@ -600,7 +631,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
                        /* Mind the ia limit on FRMR page list depth */
                        ia->ri_max_frmr_depth = min_t(unsigned int,
                                RPCRDMA_MAX_DATA_SEGS,
-                               devattr.max_fast_reg_page_list_len);
+                               devattr->max_fast_reg_page_list_len);
                }
        }
        if (memreg == RPCRDMA_MTHCAFMR) {
@@ -638,14 +669,14 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
                                "phys register failed with %lX\n",
                                __func__, PTR_ERR(ia->ri_bind_mem));
                        rc = -ENOMEM;
-                       goto out2;
+                       goto out3;
                }
                break;
        default:
                printk(KERN_ERR "RPC: Unsupported memory "
                                "registration mode: %d\n", memreg);
                rc = -ENOMEM;
-               goto out2;
+               goto out3;
        }
        dprintk("RPC:       %s: memory registration strategy is %d\n",
                __func__, memreg);
@@ -655,6 +686,10 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 
        rwlock_init(&ia->ri_qplock);
        return 0;
+
+out3:
+       ib_dealloc_pd(ia->ri_pd);
+       ia->ri_pd = NULL;
 out2:
        rdma_destroy_id(ia->ri_id);
        ia->ri_id = NULL;
@@ -698,20 +733,13 @@ int
 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                                struct rpcrdma_create_data_internal *cdata)
 {
-       struct ib_device_attr devattr;
+       struct ib_device_attr *devattr = &ia->ri_devattr;
        struct ib_cq *sendcq, *recvcq;
        int rc, err;
 
-       rc = ib_query_device(ia->ri_id->device, &devattr);
-       if (rc) {
-               dprintk("RPC:       %s: ib_query_device failed %d\n",
-                       __func__, rc);
-               return rc;
-       }
-
        /* check provider's send/recv wr limits */
-       if (cdata->max_requests > devattr.max_qp_wr)
-               cdata->max_requests = devattr.max_qp_wr;
+       if (cdata->max_requests > devattr->max_qp_wr)
+               cdata->max_requests = devattr->max_qp_wr;
 
        ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
        ep->rep_attr.qp_context = ep;
@@ -746,8 +774,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
                }
                ep->rep_attr.cap.max_send_wr *= depth;
-               if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
-                       cdata->max_requests = devattr.max_qp_wr / depth;
+               if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) {
+                       cdata->max_requests = devattr->max_qp_wr / depth;
                        if (!cdata->max_requests)
                                return -EINVAL;
                        ep->rep_attr.cap.max_send_wr = cdata->max_requests *
@@ -766,6 +794,14 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        ep->rep_attr.qp_type = IB_QPT_RC;
        ep->rep_attr.port_num = ~0;
 
+       if (cdata->padding) {
+               ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
+                                                     GFP_KERNEL);
+               if (IS_ERR(ep->rep_padbuf))
+                       return PTR_ERR(ep->rep_padbuf);
+       } else
+               ep->rep_padbuf = NULL;
+
        dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
                "iovs: send %d recv %d\n",
                __func__,
@@ -781,7 +817,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        else if (ep->rep_cqinit <= 2)
                ep->rep_cqinit = 0;
        INIT_CQCOUNT(ep);
-       ep->rep_ia = ia;
        init_waitqueue_head(&ep->rep_connect_wait);
        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
@@ -831,10 +866,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
        /* Client offers RDMA Read but does not initiate */
        ep->rep_remote_cma.initiator_depth = 0;
-       if (devattr.max_qp_rd_atom > 32)        /* arbitrary but <= 255 */
+       if (devattr->max_qp_rd_atom > 32)       /* arbitrary but <= 255 */
                ep->rep_remote_cma.responder_resources = 32;
        else
-               ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
+               ep->rep_remote_cma.responder_resources =
+                                               devattr->max_qp_rd_atom;
 
        ep->rep_remote_cma.retry_count = 7;
        ep->rep_remote_cma.flow_control = 0;
@@ -848,6 +884,7 @@ out2:
                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
                        __func__, err);
 out1:
+       rpcrdma_free_regbuf(ia, ep->rep_padbuf);
        return rc;
 }
 
@@ -874,11 +911,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
                ia->ri_id->qp = NULL;
        }
 
-       /* padding - could be done in rpcrdma_buffer_destroy... */
-       if (ep->rep_pad_mr) {
-               rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
-               ep->rep_pad_mr = NULL;
-       }
+       rpcrdma_free_regbuf(ia, ep->rep_padbuf);
 
        rpcrdma_clean_cq(ep->rep_attr.recv_cq);
        rc = ib_destroy_cq(ep->rep_attr.recv_cq);
@@ -1048,6 +1081,48 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        }
 }
 
+static struct rpcrdma_req *
+rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_req *req;
+
+       req = kzalloc(sizeof(*req), GFP_KERNEL);
+       if (req == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       req->rl_buffer = &r_xprt->rx_buf;
+       return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+       struct rpcrdma_rep *rep;
+       int rc;
+
+       rc = -ENOMEM;
+       rep = kzalloc(sizeof(*rep), GFP_KERNEL);
+       if (rep == NULL)
+               goto out;
+
+       rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
+                                              GFP_KERNEL);
+       if (IS_ERR(rep->rr_rdmabuf)) {
+               rc = PTR_ERR(rep->rr_rdmabuf);
+               goto out_free;
+       }
+
+       rep->rr_buffer = &r_xprt->rx_buf;
+       return rep;
+
+out_free:
+       kfree(rep);
+out:
+       return ERR_PTR(rc);
+}
+
 static int
 rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
 {
@@ -1134,27 +1209,26 @@ out_free:
 }
 
 int
-rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
-       struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
+rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 {
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+       struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
        char *p;
-       size_t len, rlen, wlen;
+       size_t len;
        int i, rc;
 
        buf->rb_max_requests = cdata->max_requests;
        spin_lock_init(&buf->rb_lock);
-       atomic_set(&buf->rb_credits, 1);
 
        /* Need to allocate:
         *   1.  arrays for send and recv pointers
         *   2.  arrays of struct rpcrdma_req to fill in pointers
         *   3.  array of struct rpcrdma_rep for replies
-        *   4.  padding, if any
         * Send/recv buffers in req/rep need to be registered
         */
        len = buf->rb_max_requests *
                (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-       len += cdata->padding;
 
        p = kzalloc(len, GFP_KERNEL);
        if (p == NULL) {
@@ -1170,17 +1244,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
        buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
        p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
 
-       /*
-        * Register the zeroed pad buffer, if any.
-        */
-       if (cdata->padding) {
-               rc = rpcrdma_register_internal(ia, p, cdata->padding,
-                                           &ep->rep_pad_mr, &ep->rep_pad);
-               if (rc)
-                       goto out;
-       }
-       p += cdata->padding;
-
        INIT_LIST_HEAD(&buf->rb_mws);
        INIT_LIST_HEAD(&buf->rb_all);
        switch (ia->ri_memreg_strategy) {
@@ -1198,68 +1261,56 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
                break;
        }
 
-       /*
-        * Allocate/init the request/reply buffers. Doing this
-        * using kmalloc for now -- one for each buf.
-        */
-       wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
-       rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
-       dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
-               __func__, wlen, rlen);
-
        for (i = 0; i < buf->rb_max_requests; i++) {
                struct rpcrdma_req *req;
                struct rpcrdma_rep *rep;
 
-               req = kmalloc(wlen, GFP_KERNEL);
-               if (req == NULL) {
+               req = rpcrdma_create_req(r_xprt);
+               if (IS_ERR(req)) {
                        dprintk("RPC:       %s: request buffer %d alloc"
                                " failed\n", __func__, i);
-                       rc = -ENOMEM;
+                       rc = PTR_ERR(req);
                        goto out;
                }
-               memset(req, 0, sizeof(struct rpcrdma_req));
                buf->rb_send_bufs[i] = req;
-               buf->rb_send_bufs[i]->rl_buffer = buf;
 
-               rc = rpcrdma_register_internal(ia, req->rl_base,
-                               wlen - offsetof(struct rpcrdma_req, rl_base),
-                               &buf->rb_send_bufs[i]->rl_handle,
-                               &buf->rb_send_bufs[i]->rl_iov);
-               if (rc)
-                       goto out;
-
-               buf->rb_send_bufs[i]->rl_size = wlen -
-                                               sizeof(struct rpcrdma_req);
-
-               rep = kmalloc(rlen, GFP_KERNEL);
-               if (rep == NULL) {
+               rep = rpcrdma_create_rep(r_xprt);
+               if (IS_ERR(rep)) {
                        dprintk("RPC:       %s: reply buffer %d alloc failed\n",
                                __func__, i);
-                       rc = -ENOMEM;
+                       rc = PTR_ERR(rep);
                        goto out;
                }
-               memset(rep, 0, sizeof(struct rpcrdma_rep));
                buf->rb_recv_bufs[i] = rep;
-               buf->rb_recv_bufs[i]->rr_buffer = buf;
-
-               rc = rpcrdma_register_internal(ia, rep->rr_base,
-                               rlen - offsetof(struct rpcrdma_rep, rr_base),
-                               &buf->rb_recv_bufs[i]->rr_handle,
-                               &buf->rb_recv_bufs[i]->rr_iov);
-               if (rc)
-                       goto out;
-
        }
-       dprintk("RPC:       %s: max_requests %d\n",
-               __func__, buf->rb_max_requests);
-       /* done */
+
        return 0;
 out:
        rpcrdma_buffer_destroy(buf);
        return rc;
 }
 
+static void
+rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
+{
+       if (!rep)
+               return;
+
+       rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
+       kfree(rep);
+}
+
+static void
+rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+       if (!req)
+               return;
+
+       rpcrdma_free_regbuf(ia, req->rl_sendbuf);
+       rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
+       kfree(req);
+}
+
 static void
 rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
 {
@@ -1315,18 +1366,10 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
        dprintk("RPC:       %s: entering\n", __func__);
 
        for (i = 0; i < buf->rb_max_requests; i++) {
-               if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
-                       rpcrdma_deregister_internal(ia,
-                                       buf->rb_recv_bufs[i]->rr_handle,
-                                       &buf->rb_recv_bufs[i]->rr_iov);
-                       kfree(buf->rb_recv_bufs[i]);
-               }
-               if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
-                       rpcrdma_deregister_internal(ia,
-                                       buf->rb_send_bufs[i]->rl_handle,
-                                       &buf->rb_send_bufs[i]->rl_iov);
-                       kfree(buf->rb_send_bufs[i]);
-               }
+               if (buf->rb_recv_bufs)
+                       rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
+               if (buf->rb_send_bufs)
+                       rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
        }
 
        switch (ia->ri_memreg_strategy) {
@@ -1450,8 +1493,8 @@ rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
        int i;
 
        for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
-               rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
-       rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
+               rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
+       rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
 }
 
 static void
@@ -1537,7 +1580,7 @@ rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
                        list_add(&r->mw_list, stale);
                        continue;
                }
-               req->rl_segments[i].mr_chunk.rl_mw = r;
+               req->rl_segments[i].rl_mw = r;
                if (unlikely(i-- == 0))
                        return req;     /* Success */
        }
@@ -1559,7 +1602,7 @@ rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
                r = list_entry(buf->rb_mws.next,
                               struct rpcrdma_mw, mw_list);
                list_del(&r->mw_list);
-               req->rl_segments[i].mr_chunk.rl_mw = r;
+               req->rl_segments[i].rl_mw = r;
                if (unlikely(i-- == 0))
                        return req;     /* Success */
        }
@@ -1658,8 +1701,6 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
        struct rpcrdma_buffer *buffers = req->rl_buffer;
        unsigned long flags;
 
-       if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
-               buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
        spin_lock_irqsave(&buffers->rb_lock, flags);
        if (buffers->rb_recv_index < buffers->rb_max_requests) {
                req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
@@ -1688,7 +1729,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
  */
 
-int
+static int
 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
                                struct ib_mr **mrp, struct ib_sge *iov)
 {
@@ -1739,7 +1780,7 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
        return rc;
 }
 
-int
+static int
 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
                                struct ib_mr *mr, struct ib_sge *iov)
 {
@@ -1757,6 +1798,61 @@ rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
        return rc;
 }
 
+/**
+ * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
+ * @ia: controlling rpcrdma_ia
+ * @size: size of buffer to be allocated, in bytes
+ * @flags: GFP flags
+ *
+ * Returns pointer to private header of an area of internally
+ * registered memory, or an ERR_PTR. The registered buffer follows
+ * the end of the private header.
+ *
+ * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
+ * receiving the payload of RDMA RECV operations. regbufs are not
+ * used for RDMA READ/WRITE operations, thus are registered only for
+ * LOCAL access.
+ */
+struct rpcrdma_regbuf *
+rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
+{
+       struct rpcrdma_regbuf *rb;
+       int rc;
+
+       rc = -ENOMEM;
+       rb = kmalloc(sizeof(*rb) + size, flags);
+       if (rb == NULL)
+               goto out;
+
+       rb->rg_size = size;
+       rb->rg_owner = NULL;
+       rc = rpcrdma_register_internal(ia, rb->rg_base, size,
+                                      &rb->rg_mr, &rb->rg_iov);
+       if (rc)
+               goto out_free;
+
+       return rb;
+
+out_free:
+       kfree(rb);
+out:
+       return ERR_PTR(rc);
+}
+
+/**
+ * rpcrdma_free_regbuf - deregister and free registered buffer
+ * @ia: controlling rpcrdma_ia
+ * @rb: regbuf to be deregistered and freed
+ */
+void
+rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+       if (rb) {
+               rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
+               kfree(rb);
+       }
+}
+
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */
@@ -1799,7 +1895,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
                        struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_mr_seg *seg1 = seg;
-       struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
+       struct rpcrdma_mw *mw = seg1->rl_mw;
        struct rpcrdma_frmr *frmr = &mw->r.frmr;
        struct ib_mr *mr = frmr->fr_mr;
        struct ib_send_wr fastreg_wr, *bad_wr;
@@ -1888,12 +1984,12 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
        struct ib_send_wr invalidate_wr, *bad_wr;
        int rc;
 
-       seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
+       seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
 
        memset(&invalidate_wr, 0, sizeof invalidate_wr);
-       invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
+       invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
        invalidate_wr.opcode = IB_WR_LOCAL_INV;
-       invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+       invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
        DECR_CQCOUNT(&r_xprt->rx_ep);
 
        read_lock(&ia->ri_qplock);
@@ -1903,7 +1999,7 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
        read_unlock(&ia->ri_qplock);
        if (rc) {
                /* Force rpcrdma_buffer_get() to retry */
-               seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
+               seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
                dprintk("RPC:       %s: failed ib_post_send for invalidate,"
                        " status %i\n", __func__, rc);
        }
@@ -1935,8 +2031,7 @@ rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
                    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
                        break;
        }
-       rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
-                               physaddrs, i, seg1->mr_dma);
+       rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma);
        if (rc) {
                dprintk("RPC:       %s: failed ib_map_phys_fmr "
                        "%u@0x%llx+%i (%d)... status %i\n", __func__,
@@ -1945,7 +2040,7 @@ rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
                while (i--)
                        rpcrdma_unmap_one(ia, --seg);
        } else {
-               seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
+               seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey;
                seg1->mr_base = seg1->mr_dma + pageoff;
                seg1->mr_nsegs = i;
                seg1->mr_len = len;
@@ -1962,7 +2057,7 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
        LIST_HEAD(l);
        int rc;
 
-       list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
+       list_add(&seg1->rl_mw->r.fmr->list, &l);
        rc = ib_unmap_fmr(&l);
        read_lock(&ia->ri_qplock);
        while (seg1->mr_nsegs--)
@@ -2104,11 +2199,13 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
 
        recv_wr.next = NULL;
        recv_wr.wr_id = (u64) (unsigned long) rep;
-       recv_wr.sg_list = &rep->rr_iov;
+       recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
        recv_wr.num_sge = 1;
 
        ib_dma_sync_single_for_cpu(ia->ri_id->device,
-               rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
+                                  rdmab_addr(rep->rr_rdmabuf),
+                                  rdmab_length(rep->rr_rdmabuf),
+                                  DMA_BIDIRECTIONAL);
 
        rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
 
index b799041b75bf9efd01dc0c3a5bb77cc832bdf020..c9d2a02f631b228540a9403c0e90d756e19deddc 100644 (file)
@@ -70,6 +70,9 @@ struct rpcrdma_ia {
        int                     ri_async_rc;
        enum rpcrdma_memreg     ri_memreg_strategy;
        unsigned int            ri_max_frmr_depth;
+       struct ib_device_attr   ri_devattr;
+       struct ib_qp_attr       ri_qp_attr;
+       struct ib_qp_init_attr  ri_qp_init_attr;
 };
 
 /*
@@ -83,13 +86,9 @@ struct rpcrdma_ep {
        atomic_t                rep_cqcount;
        int                     rep_cqinit;
        int                     rep_connected;
-       struct rpcrdma_ia       *rep_ia;
        struct ib_qp_init_attr  rep_attr;
        wait_queue_head_t       rep_connect_wait;
-       struct ib_sge           rep_pad;        /* holds zeroed pad */
-       struct ib_mr            *rep_pad_mr;    /* holds zeroed pad */
-       void                    (*rep_func)(struct rpcrdma_ep *);
-       struct rpc_xprt         *rep_xprt;      /* for rep_func */
+       struct rpcrdma_regbuf   *rep_padbuf;
        struct rdma_conn_param  rep_remote_cma;
        struct sockaddr_storage rep_remote_addr;
        struct delayed_work     rep_connect_worker;
@@ -106,6 +105,44 @@ struct rpcrdma_ep {
 #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
 #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
 
+/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
+ *
+ * The below structure appears at the front of a large region of kmalloc'd
+ * memory, which always starts on a good alignment boundary.
+ */
+
+struct rpcrdma_regbuf {
+       size_t                  rg_size;
+       struct rpcrdma_req      *rg_owner;
+       struct ib_mr            *rg_mr;
+       struct ib_sge           rg_iov;
+       __be32                  rg_base[0] __attribute__ ((aligned(256)));
+};
+
+static inline u64
+rdmab_addr(struct rpcrdma_regbuf *rb)
+{
+       return rb->rg_iov.addr;
+}
+
+static inline u32
+rdmab_length(struct rpcrdma_regbuf *rb)
+{
+       return rb->rg_iov.length;
+}
+
+static inline u32
+rdmab_lkey(struct rpcrdma_regbuf *rb)
+{
+       return rb->rg_iov.lkey;
+}
+
+static inline struct rpcrdma_msg *
+rdmab_to_msg(struct rpcrdma_regbuf *rb)
+{
+       return (struct rpcrdma_msg *)rb->rg_base;
+}
+
 enum rpcrdma_chunktype {
        rpcrdma_noch = 0,
        rpcrdma_readch,
@@ -134,22 +171,16 @@ enum rpcrdma_chunktype {
 /* temporary static scatter/gather max */
 #define RPCRDMA_MAX_DATA_SEGS  (64)    /* max scatter/gather */
 #define RPCRDMA_MAX_SEGS       (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
-#define MAX_RPCRDMAHDR (\
-       /* max supported RPC/RDMA header */ \
-       sizeof(struct rpcrdma_msg) + (2 * sizeof(u32)) + \
-       (sizeof(struct rpcrdma_read_chunk) * RPCRDMA_MAX_SEGS) + sizeof(u32))
 
 struct rpcrdma_buffer;
 
 struct rpcrdma_rep {
-       unsigned int    rr_len;         /* actual received reply length */
-       struct rpcrdma_buffer *rr_buffer; /* home base for this structure */
-       struct rpc_xprt *rr_xprt;       /* needed for request/reply matching */
-       void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */
-       struct list_head rr_list;       /* tasklet list */
-       struct ib_sge   rr_iov;         /* for posting */
-       struct ib_mr    *rr_handle;     /* handle for mem in rr_iov */
-       char    rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */
+       unsigned int            rr_len;
+       struct rpcrdma_buffer   *rr_buffer;
+       struct rpc_xprt         *rr_xprt;
+       void                    (*rr_func)(struct rpcrdma_rep *);
+       struct list_head        rr_list;
+       struct rpcrdma_regbuf   *rr_rdmabuf;
 };
 
 /*
@@ -211,10 +242,7 @@ struct rpcrdma_mw {
  */
 
 struct rpcrdma_mr_seg {                /* chunk descriptors */
-       union {                         /* chunk memory handles */
-               struct ib_mr    *rl_mr;         /* if registered directly */
-               struct rpcrdma_mw *rl_mw;       /* if registered from region */
-       } mr_chunk;
+       struct rpcrdma_mw *rl_mw;       /* registered MR */
        u64             mr_base;        /* registration result */
        u32             mr_rkey;        /* registration result */
        u32             mr_len;         /* length of chunk or segment */
@@ -227,22 +255,26 @@ struct rpcrdma_mr_seg {           /* chunk descriptors */
 };
 
 struct rpcrdma_req {
-       size_t          rl_size;        /* actual length of buffer */
        unsigned int    rl_niovs;       /* 0, 2 or 4 */
        unsigned int    rl_nchunks;     /* non-zero if chunks */
        unsigned int    rl_connect_cookie;      /* retry detection */
        enum rpcrdma_chunktype  rl_rtype, rl_wtype;
        struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
        struct rpcrdma_rep      *rl_reply;/* holder for reply buffer */
-       struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
        struct ib_sge   rl_send_iov[4]; /* for active requests */
-       struct ib_sge   rl_iov;         /* for posting */
-       struct ib_mr    *rl_handle;     /* handle for mem in rl_iov */
-       char            rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
-       __u32           rl_xdr_buf[0];  /* start of returned rpc rq_buffer */
+       struct rpcrdma_regbuf *rl_rdmabuf;
+       struct rpcrdma_regbuf *rl_sendbuf;
+       struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
 };
-#define rpcr_to_rdmar(r) \
-       container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0])
+
+static inline struct rpcrdma_req *
+rpcr_to_rdmar(struct rpc_rqst *rqst)
+{
+       struct rpcrdma_regbuf *rb = container_of(rqst->rq_buffer,
+                                                struct rpcrdma_regbuf,
+                                                rg_base[0]);
+       return rb->rg_owner;
+}
 
 /*
  * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for
@@ -252,7 +284,6 @@ struct rpcrdma_req {
  */
 struct rpcrdma_buffer {
        spinlock_t      rb_lock;        /* protects indexes */
-       atomic_t        rb_credits;     /* most recent server credits */
        int             rb_max_requests;/* client max requests */
        struct list_head rb_mws;        /* optional memory windows/fmrs/frmrs */
        struct list_head rb_all;
@@ -318,16 +349,16 @@ struct rpcrdma_stats {
  * during unmount.
  */
 struct rpcrdma_xprt {
-       struct rpc_xprt         xprt;
+       struct rpc_xprt         rx_xprt;
        struct rpcrdma_ia       rx_ia;
        struct rpcrdma_ep       rx_ep;
        struct rpcrdma_buffer   rx_buf;
        struct rpcrdma_create_data_internal rx_data;
-       struct delayed_work     rdma_connect;
+       struct delayed_work     rx_connect_worker;
        struct rpcrdma_stats    rx_stats;
 };
 
-#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt)
+#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt)
 #define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
 
 /* Setting this to 0 ensures interoperability with early servers.
@@ -358,9 +389,7 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
 /*
  * Buffer calls - xprtrdma/verbs.c
  */
-int rpcrdma_buffer_create(struct rpcrdma_buffer *, struct rpcrdma_ep *,
-                               struct rpcrdma_ia *,
-                               struct rpcrdma_create_data_internal *);
+int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
@@ -368,16 +397,16 @@ void rpcrdma_buffer_put(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
-int rpcrdma_register_internal(struct rpcrdma_ia *, void *, int,
-                               struct ib_mr **, struct ib_sge *);
-int rpcrdma_deregister_internal(struct rpcrdma_ia *,
-                               struct ib_mr *, struct ib_sge *);
-
 int rpcrdma_register_external(struct rpcrdma_mr_seg *,
                                int, int, struct rpcrdma_xprt *);
 int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,
                                struct rpcrdma_xprt *);
 
+struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
+                                           size_t, gfp_t);
+void rpcrdma_free_regbuf(struct rpcrdma_ia *,
+                        struct rpcrdma_regbuf *);
+
 /*
  * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
  */