Verbs providers may perform house-keeping on the Send Queue during
each signaled send completion. It is necessary therefore for a verbs
consumer (like xprtrdma) to occasionally force a signaled send
completion if it runs unsignaled most of the time.
xprtrdma does not require signaled completions for Send or FastReg
Work Requests, but does signal some LocalInv Work Requests. To
ensure that Send Queue house-keeping can run before the Send Queue
is more than half-consumed, xprtrdma forces a signaled completion
on occasion by counting the number of Send Queue Entries it
consumes. It currently does this by counting each ib_post_send as
one Entry.
Commit
c9918ff56dfb ("xprtrdma: Add ro_unmap_sync method for FRWR")
introduced the ability for frwr_op_unmap_sync to post more than one
Work Request with a single post_send. Thus the underlying assumption
of one Send Queue Entry per ib_post_send is no longer true.
Also, FastReg Work Requests are currently never signaled. They
should be signaled once in a while, just as Send is, to keep the
accounting of consumed SQEs accurate.
While we're here, convert the CQCOUNT macros to the currently
preferred kernel coding style, which is inline functions.
Fixes: c9918ff56dfb ("xprtrdma: Add ro_unmap_sync method for FRWR")
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ;
- DECR_CQCOUNT(&r_xprt->rx_ep);
+ rpcrdma_set_signaled(&r_xprt->rx_ep, ®_wr->wr);
rc = ib_post_send(ia->ri_id->qp, ®_wr->wr, &bad_wr);
if (rc)
goto out_senderr;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mw *mw, *tmp;
struct rpcrdma_frmr *f;
- int rc;
+ int count, rc;
dprintk("RPC: %s: req %p\n", __func__, req);
* a single ib_post_send() call.
*/
f = NULL;
+ count = 0;
invalidate_wrs = pos = prev = NULL;
list_for_each_entry(mw, &req->rl_registered, mw_list) {
if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
}
pos = __frwr_prepare_linv_wr(mw);
+ count++;
if (!invalidate_wrs)
invalidate_wrs = pos;
f->fr_invwr.send_flags = IB_SEND_SIGNALED;
f->fr_cqe.done = frwr_wc_localinv_wake;
reinit_completion(&f->fr_linv_done);
- INIT_CQCOUNT(&r_xprt->rx_ep);
+
+ /* Initialize CQ count, since there is always a signaled
+ * WR being posted here. The new cqcount depends on how
+ * many SQEs are about to be consumed.
+ */
+ rpcrdma_init_cqcount(&r_xprt->rx_ep, count);
/* Transport disconnect drains the receive CQ before it
* replaces the QP. The RPC reply handler won't call us
ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
if (ep->rep_cqinit <= 2)
ep->rep_cqinit = 0; /* always signal? */
- INIT_CQCOUNT(ep);
+ rpcrdma_init_cqcount(ep, 0);
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr->num_sge);
- if (DECR_CQCOUNT(ep) > 0)
- send_wr->send_flags = 0;
- else { /* Provider must take a send completion every now and then */
- INIT_CQCOUNT(ep);
- send_wr->send_flags = IB_SEND_SIGNALED;
- }
-
+ rpcrdma_set_signaled(ep, send_wr);
rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
if (rc)
goto out_postsend_err;
struct delayed_work rep_connect_worker;
};
-#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
-#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
+static inline void
+rpcrdma_init_cqcount(struct rpcrdma_ep *ep, int count)
+{
+ atomic_set(&ep->rep_cqcount, ep->rep_cqinit - count);
+}
+
+/* To update send queue accounting, provider must take a
+ * send completion every now and then.
+ */
+static inline void
+rpcrdma_set_signaled(struct rpcrdma_ep *ep, struct ib_send_wr *send_wr)
+{
+ send_wr->send_flags = 0;
+ if (unlikely(atomic_sub_return(1, &ep->rep_cqcount) <= 0)) {
+ rpcrdma_init_cqcount(ep, 0);
+ send_wr->send_flags = IB_SEND_SIGNALED;
+ }
+}
/* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are