};
static const char transfertypes[][12] = {
- "pure inline", /* no chunks */
- " read chunk", /* some argument via rdma read */
- "*read chunk", /* entire request via rdma read */
- "write chunk", /* some result via rdma write */
+ "inline", /* no chunks */
+ "read list", /* some argument via rdma read */
+ "*read list", /* entire request via rdma read */
+ "write list", /* some result via rdma write */
"reply chunk" /* entire reply via rdma write */
};
/* Returns size of largest RPC-over-RDMA header in a Call message
*
- * The client marshals only one chunk list per Call message.
- * The largest list is the Read list.
+ * The largest Call header contains a full-size Read list and a
+ * minimal Reply chunk.
*/
static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
{
maxsegs += 2; /* segment for head and tail buffers */
size = maxsegs * sizeof(struct rpcrdma_read_chunk);
+ /* Minimal Read chunk size */
+ size += sizeof(__be32); /* segment count */
+ size += sizeof(struct rpcrdma_segment);
+ size += sizeof(__be32); /* list discriminator */
+
dprintk("RPC: %s: max call header size = %u\n",
__func__, size);
return size;
return n;
}
+static inline __be32 *
+xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
+{
+ *iptr++ = cpu_to_be32(seg->mr_rkey);
+ *iptr++ = cpu_to_be32(seg->mr_len);
+ return xdr_encode_hyper(iptr, seg->mr_base);
+}
+
+/* XDR-encode the Read list. Supports encoding a list of read
+ * segments that belong to a single read chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ * Read chunklist (a linked list):
+ * N elements, position P (same P for all chunks of same arg!):
+ * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
+ *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Read list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, struct rpc_rqst *rqst,
+ __be32 *iptr, enum rpcrdma_chunktype rtype)
+{
+ struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ unsigned int pos;
+ int n, nsegs;
+
+ if (rtype == rpcrdma_noch) {
+ *iptr++ = xdr_zero; /* item not present */
+ return iptr;
+ }
+
+ pos = rqst->rq_snd_buf.head[0].iov_len;
+ if (rtype == rpcrdma_areadch)
+ pos = 0;
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
+ RPCRDMA_MAX_SEGS - req->rl_nchunks);
+ if (nsegs < 0)
+ return ERR_PTR(nsegs);
+
+ do {
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
+ if (n <= 0)
+ return ERR_PTR(n);
+
+ *iptr++ = xdr_one; /* item present */
+
+ /* All read segments in this chunk
+ * have the same "position".
+ */
+ *iptr++ = cpu_to_be32(pos);
+ iptr = xdr_encode_rdma_segment(iptr, seg);
+
+ dprintk("RPC: %5u %s: read segment pos %u "
+ "%d@0x%016llx:0x%08x (%s)\n",
+ rqst->rq_task->tk_pid, __func__, pos,
+ seg->mr_len, (unsigned long long)seg->mr_base,
+ seg->mr_rkey, n < nsegs ? "more" : "last");
+
+ r_xprt->rx_stats.read_chunk_count++;
+ req->rl_nchunks++;
+ seg += n;
+ nsegs -= n;
+ } while (nsegs);
+ req->rl_nextseg = seg;
+
+ /* Finish Read list */
+ *iptr++ = xdr_zero; /* Next item not present */
+ return iptr;
+}
+
+/* XDR-encode the Write list. Supports encoding a list containing
+ * one array of plain segments that belong to a single write chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ * Write chunklist (a list of (one) counted array):
+ * N elements:
+ * 1 - N - HLOO - HLOO - ... - HLOO - 0
+ *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Write list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ struct rpc_rqst *rqst, __be32 *iptr,
+ enum rpcrdma_chunktype wtype)
+{
+ struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ int n, nsegs, nchunks;
+ __be32 *segcount;
+
+ if (wtype != rpcrdma_writech) {
+ *iptr++ = xdr_zero; /* no Write list present */
+ return iptr;
+ }
+
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
+ rqst->rq_rcv_buf.head[0].iov_len,
+ wtype, seg,
+ RPCRDMA_MAX_SEGS - req->rl_nchunks);
+ if (nsegs < 0)
+ return ERR_PTR(nsegs);
+
+ *iptr++ = xdr_one; /* Write list present */
+ segcount = iptr++; /* save location of segment count */
+
+ nchunks = 0;
+ do {
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+ if (n <= 0)
+ return ERR_PTR(n);
+
+ iptr = xdr_encode_rdma_segment(iptr, seg);
+
+ dprintk("RPC: %5u %s: write segment "
+ "%d@0x016%llx:0x%08x (%s)\n",
+ rqst->rq_task->tk_pid, __func__,
+ seg->mr_len, (unsigned long long)seg->mr_base,
+ seg->mr_rkey, n < nsegs ? "more" : "last");
+
+ r_xprt->rx_stats.write_chunk_count++;
+ r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+ req->rl_nchunks++;
+ nchunks++;
+ seg += n;
+ nsegs -= n;
+ } while (nsegs);
+ req->rl_nextseg = seg;
+
+ /* Update count of segments in this Write chunk */
+ *segcount = cpu_to_be32(nchunks);
+
+ /* Finish Write list */
+ *iptr++ = xdr_zero; /* Next item not present */
+ return iptr;
+}
+
+/* XDR-encode the Reply chunk. Supports encoding an array of plain
+ * segments that belong to a single write (reply) chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ * Reply chunk (a counted array):
+ * N elements:
+ * 1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Reply chunk, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, struct rpc_rqst *rqst,
+ __be32 *iptr, enum rpcrdma_chunktype wtype)
+{
+ struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+ int n, nsegs, nchunks;
+ __be32 *segcount;
+
+ if (wtype != rpcrdma_replych) {
+ *iptr++ = xdr_zero; /* no Reply chunk present */
+ return iptr;
+ }
+
+ nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+ RPCRDMA_MAX_SEGS - req->rl_nchunks);
+ if (nsegs < 0)
+ return ERR_PTR(nsegs);
+
+ *iptr++ = xdr_one; /* Reply chunk present */
+ segcount = iptr++; /* save location of segment count */
+
+ nchunks = 0;
+ do {
+ n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+ if (n <= 0)
+ return ERR_PTR(n);
+
+ iptr = xdr_encode_rdma_segment(iptr, seg);
+
+ dprintk("RPC: %5u %s: reply segment "
+ "%d@0x%016llx:0x%08x (%s)\n",
+ rqst->rq_task->tk_pid, __func__,
+ seg->mr_len, (unsigned long long)seg->mr_base,
+ seg->mr_rkey, n < nsegs ? "more" : "last");
+
+ r_xprt->rx_stats.reply_chunk_count++;
+ r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+ req->rl_nchunks++;
+ nchunks++;
+ seg += n;
+ nsegs -= n;
+ } while (nsegs);
+ req->rl_nextseg = seg;
+
+ /* Update count of segments in the Reply chunk */
+ *segcount = cpu_to_be32(nchunks);
+
+ return iptr;
+}
+
/*
* Copy write data inline.
* This function is used for "small" requests. Data which is passed
struct rpc_xprt *xprt = rqst->rq_xprt;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- char *base;
- size_t rpclen;
- ssize_t hdrlen;
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
+ unsigned int pos;
+ ssize_t hdrlen;
+ size_t rpclen;
+ __be32 *iptr;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
return rpcrdma_bc_marshal_reply(rqst);
#endif
- /*
- * rpclen gets amount of data in first buffer, which is the
- * pre-registered buffer.
- */
- base = rqst->rq_svec[0].iov_base;
- rpclen = rqst->rq_svec[0].iov_len;
-
headerp = rdmab_to_msg(req->rl_rdmabuf);
/* don't byte-swap XID, it's already done in request */
headerp->rm_xid = rqst->rq_xid;
*/
if (rpcrdma_args_inline(r_xprt, rqst)) {
rtype = rpcrdma_noch;
+ rpcrdma_inline_pullup(rqst);
+ rpclen = rqst->rq_svec[0].iov_len;
} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
+ rpclen = rqst->rq_svec[0].iov_len;
+ rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
} else {
r_xprt->rx_stats.nomsg_call_count++;
headerp->rm_type = htonl(RDMA_NOMSG);
rpclen = 0;
}
- /* The following simplification is not true forever */
- if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
- wtype = rpcrdma_noch;
- if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
- dprintk("RPC: %s: cannot marshal multiple chunk lists\n",
- __func__);
- return -EIO;
- }
-
- hdrlen = RPCRDMA_HDRLEN_MIN;
-
- /*
- * Pull up any extra send data into the preregistered buffer.
- * When padding is in use and applies to the transfer, insert
- * it and change the message type.
+ /* This implementation supports the following combinations
+ * of chunk lists in one RPC-over-RDMA Call message:
+ *
+ * - Read list
+ * - Write list
+ * - Reply chunk
+ * - Read list + Reply chunk
+ *
+ * It might not yet support the following combinations:
+ *
+ * - Read list + Write list
+ *
+ * It does not support the following combinations:
+ *
+ * - Write list + Reply chunk
+ * - Read list + Write list + Reply chunk
+ *
+ * This implementation supports only a single chunk in each
+ * Read or Write list. Thus for example the client cannot
+ * send a Call message with a Position Zero Read chunk and a
+ * regular Read chunk at the same time.
*/
- if (rtype == rpcrdma_noch) {
-
- rpcrdma_inline_pullup(rqst);
-
- headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
- headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
- headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
- /* new length after pullup */
- rpclen = rqst->rq_svec[0].iov_len;
- } else if (rtype == rpcrdma_readch)
- rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
- if (rtype != rpcrdma_noch) {
- hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
- headerp, rtype);
- wtype = rtype; /* simplify dprintk */
-
- } else if (wtype != rpcrdma_noch) {
- hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
- headerp, wtype);
- }
- if (hdrlen < 0)
- return hdrlen;
+ req->rl_nchunks = 0;
+ req->rl_nextseg = req->rl_segments;
+ iptr = headerp->rm_body.rm_chunks;
+ iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
+ if (IS_ERR(iptr))
+ goto out_unmap;
+ iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
+ if (IS_ERR(iptr))
+ goto out_unmap;
+ iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
+ if (IS_ERR(iptr))
+ goto out_unmap;
+ hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
goto out_overflow;
- dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd"
- " headerp 0x%p base 0x%p lkey 0x%x\n",
- __func__, transfertypes[wtype], hdrlen, rpclen,
- headerp, base, rdmab_lkey(req->rl_rdmabuf));
+ dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
+ rqst->rq_task->tk_pid, __func__,
+ transfertypes[rtype], transfertypes[wtype],
+ hdrlen, rpclen);
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = hdrlen;
return 0;
out_overflow:
- pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s\n",
- hdrlen, rpclen, transfertypes[wtype]);
+ pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
+ hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
/* Terminate this RPC. Chunks registered above will be
* released by xprt_release -> xprt_rmda_free .
*/
return -EIO;
+
+out_unmap:
+ for (pos = 0; req->rl_nchunks--;)
+ pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
+ &req->rl_segments[pos]);
+ return PTR_ERR(iptr);
}
/*