]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
xprtrdma: Disconnect on registration failure
authorChuck Lever <chuck.lever@oracle.com>
Wed, 28 May 2014 14:35:14 +0000 (10:35 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Wed, 4 Jun 2014 12:56:53 +0000 (08:56 -0400)
If rpcrdma_register_external() fails during request marshaling, the
current RPC request is killed. Instead, this RPC should be retried
after reconnecting the transport instance.

The most likely reason for registration failure with FRMR is a
failed post_send, which would be due to a remote transport
disconnect or memory exhaustion. These issues can be recovered
by a retry.

Problems encountered in the marshaling logic itself will not be
corrected by trying again, so these should still kill a request.

Now that we've added a clean exit for marshaling errors, take the
opportunity to defang some BUG_ON's.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c

index 77b84cfa5c775fa505303c38898f21ea5db7bf19..693966d3f33ba12c1220538ff58f632ae762562d 100644 (file)
@@ -77,6 +77,8 @@ static const char transfertypes[][12] = {
  * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
  * elements. Segments are then coalesced when registered, if possible
  * within the selected memreg mode.
+ *
+ * Returns positive number of segments converted, or a negative errno.
  */
 
 static int
@@ -103,12 +105,13 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
                        /* alloc the pagelist for receiving buffer */
                        ppages[p] = alloc_page(GFP_ATOMIC);
                        if (!ppages[p])
-                               return 0;
+                               return -ENOMEM;
                }
                seg[n].mr_page = ppages[p];
                seg[n].mr_offset = (void *)(unsigned long) page_base;
                seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
-               BUG_ON(seg[n].mr_len > PAGE_SIZE);
+               if (seg[n].mr_len > PAGE_SIZE)
+                       return -EIO;
                len -= seg[n].mr_len;
                ++n;
                ++p;
@@ -117,7 +120,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 
        /* Message overflows the seg array */
        if (len && n == nsegs)
-               return 0;
+               return -EIO;
 
        if (xdrbuf->tail[0].iov_len) {
                /* the rpcrdma protocol allows us to omit any trailing
@@ -126,7 +129,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
                        return n;
                if (n == nsegs)
                        /* Tail remains, but we're out of segments */
-                       return 0;
+                       return -EIO;
                seg[n].mr_page = NULL;
                seg[n].mr_offset = xdrbuf->tail[0].iov_base;
                seg[n].mr_len = xdrbuf->tail[0].iov_len;
@@ -167,15 +170,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
  *  Reply chunk (a counted array):
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Returns positive RPC/RDMA header size, or negative errno.
  */
 
-static unsigned int
+static ssize_t
 rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
                struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
 {
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
-       int nsegs, nchunks = 0;
+       int n, nsegs, nchunks = 0;
        unsigned int pos;
        struct rpcrdma_mr_seg *seg = req->rl_segments;
        struct rpcrdma_read_chunk *cur_rchunk = NULL;
@@ -201,11 +206,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
                pos = target->head[0].iov_len;
 
        nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
-       if (nsegs == 0)
-               return 0;
+       if (nsegs < 0)
+               return nsegs;
 
        do {
-               int n = rpcrdma_register_external(seg, nsegs,
+               n = rpcrdma_register_external(seg, nsegs,
                                                cur_wchunk != NULL, r_xprt);
                if (n <= 0)
                        goto out;
@@ -277,7 +282,7 @@ out:
        for (pos = 0; nchunks--;)
                pos += rpcrdma_deregister_external(
                                &req->rl_segments[pos], r_xprt);
-       return 0;
+       return n;
 }
 
 /*
@@ -359,6 +364,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
  *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
  *  [2] -- optional padding.
  *  [3] -- if padded, header only in [1] and data here.
+ *
+ * Returns zero on success, otherwise a negative errno.
  */
 
 int
@@ -368,7 +375,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
        char *base;
-       size_t hdrlen, rpclen, padlen;
+       size_t rpclen, padlen;
+       ssize_t hdrlen;
        enum rpcrdma_chunktype rtype, wtype;
        struct rpcrdma_msg *headerp;
 
@@ -439,7 +447,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        /* The following simplification is not true forever */
        if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
                wtype = rpcrdma_noch;
-       BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
+       if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
+               dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
+                       __func__);
+               return -EIO;
+       }
 
        hdrlen = 28; /*sizeof *headerp;*/
        padlen = 0;
@@ -464,8 +476,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
                        headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
                        headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
                        hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
-                       BUG_ON(wtype != rpcrdma_noch);
-
+                       if (wtype != rpcrdma_noch) {
+                               dprintk("RPC:       %s: invalid chunk list\n",
+                                       __func__);
+                               return -EIO;
+                       }
                } else {
                        headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
                        headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
@@ -500,9 +515,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
                hdrlen = rpcrdma_create_chunks(rqst,
                                        &rqst->rq_rcv_buf, headerp, wtype);
        }
-
-       if (hdrlen == 0)
-               return -1;
+       if (hdrlen < 0)
+               return hdrlen;
 
        dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
                " headerp 0x%p base 0x%p lkey 0x%x\n",
index 93fe7753ff94aea04c118f5242cf35968b603962..66f91f0d071a9bbdec3e440aaa09c94df57ffbe0 100644 (file)
@@ -595,13 +595,12 @@ xprt_rdma_send_request(struct rpc_task *task)
        struct rpc_xprt *xprt = rqst->rq_xprt;
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+       int rc;
 
-       /* marshal the send itself */
-       if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
-               r_xprt->rx_stats.failed_marshal_count++;
-               dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
-                       __func__);
-               return -EIO;
+       if (req->rl_niovs == 0) {
+               rc = rpcrdma_marshal_req(rqst);
+               if (rc < 0)
+                       goto failed_marshal;
        }
 
        if (req->rl_reply == NULL)              /* e.g. reconnection */
@@ -625,6 +624,12 @@ xprt_rdma_send_request(struct rpc_task *task)
        rqst->rq_bytes_sent = 0;
        return 0;
 
+failed_marshal:
+       r_xprt->rx_stats.failed_marshal_count++;
+       dprintk("RPC:       %s: rpcrdma_marshal_req failed, status %i\n",
+               __func__, rc);
+       if (rc == -EIO)
+               return -EIO;
 drop_connection:
        xprt_disconnect_done(xprt);
        return -ENOTCONN;       /* implies disconnect */