]> git.karo-electronics.de Git - mv-sheeva.git/commitdiff
RDS/IB: track signaled sends
authorZach Brown <zach.brown@oracle.com>
Wed, 14 Jul 2010 20:55:35 +0000 (13:55 -0700)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:16:40 +0000 (18:16 -0700)
We're seeing bugs today where IB connection shutdown clears the send
ring while the tasklet is processing completed sends.  Implementation
details cause this to dereference a null pointer.  Shutdown needs to
wait for send completion to stop before tearing down the connection.  We
can't simply wait for the ring to empty because it may contain
unsignaled sends that will never be processed.

This patch tracks the number of signaled sends that we've posted and
waits for them to complete.  It also makes sure that the tasklet has
finished executing.

Signed-off-by: Zach Brown <zach.brown@oracle.com>
net/rds/ib.h
net/rds/ib_cm.c
net/rds/ib_send.c

index acda2dbc657624e81ed52eb69b7b7d0dcc90b4e1..a13ced504145fbb13c38aa1b96e15d2f351e73cd 100644 (file)
@@ -108,6 +108,7 @@ struct rds_ib_connection {
        struct rds_header       *i_send_hdrs;
        u64                     i_send_hdrs_dma;
        struct rds_ib_send_work *i_sends;
+       atomic_t                i_signaled_sends;
 
        /* rx */
        struct tasklet_struct   i_recv_tasklet;
index 10f6a8815cd0e95b5b128cad6167eb4d5dd19f30..123c7d33b54e78b80b2dc3927b2bd66a2ce029ba 100644 (file)
@@ -615,11 +615,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                }
 
                /*
-                * Don't wait for the send ring to be empty -- there may be completed
-                * non-signaled entries sitting on there. We unmap these below.
+                * We want to wait for tx and rx completion to finish
+                * before we tear down the connection, but we have to be
+                * careful not to get stuck waiting on a send ring that
+                * only has unsignaled sends in it.  We've shutdown new
+                * sends before getting here so by waiting for signaled
+                * sends to complete we're ensured that there will be no
+                * more tx processing.
                 */
                wait_event(rds_ib_ring_empty_wait,
-                       rds_ib_ring_empty(&ic->i_recv_ring));
+                          rds_ib_ring_empty(&ic->i_recv_ring) &&
+                          (atomic_read(&ic->i_signaled_sends) == 0));
+               tasklet_kill(&ic->i_recv_tasklet);
 
                if (ic->i_send_hdrs)
                        ib_dma_free_coherent(dev,
@@ -729,6 +736,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&ic->i_ack_lock);
 #endif
+       atomic_set(&ic->i_signaled_sends, 0);
 
        /*
         * rds_ib_conn_shutdown() waits for these to be emptied so they
index e88cb4af009b8b7eff61829fc2312dfce569a3a7..15f75692574c4f50dd40deefbb642b30bc54515a 100644 (file)
@@ -219,6 +219,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
        }
 }
 
+/*
+ * The only fast path caller always has a non-zero nr, so we don't
+ * bother testing nr before performing the atomic sub.
+ */
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+       if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+           waitqueue_active(&rds_ib_ring_empty_wait))
+               wake_up(&rds_ib_ring_empty_wait);
+       BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
 /*
  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
  * operations performed in the send path.  As the sender allocs and potentially
@@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
        u32 oldest;
        u32 i = 0;
        int ret;
+       int nr_sig = 0;
 
        rdsdebug("cq %p conn %p\n", cq, conn);
        rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 
                for (i = 0; i < completed; i++) {
                        send = &ic->i_sends[oldest];
+                       if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                               nr_sig++;
 
                        rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
@@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
                }
 
                rds_ib_ring_free(&ic->i_send_ring, completed);
+               rds_ib_sub_signaled(ic, nr_sig);
+               nr_sig = 0;
 
                if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
                    test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
-                                             struct rds_ib_send_work *send,
-                                             bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+                                            struct rds_ib_send_work *send,
+                                            bool notify)
 {
        /*
         * We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
        if (ic->i_unsignaled_wrs-- == 0 || notify) {
                ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
                send->s_wr.send_flags |= IB_SEND_SIGNALED;
+               return 1;
        }
+       return 0;
 }
 
 /*
@@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        int bytes_sent = 0;
        int ret;
        int flow_controlled = 0;
+       int nr_sig = 0;
 
        BUG_ON(off % RDS_FRAG_SIZE);
        BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
                        send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 
+               if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                       nr_sig++;
+
                rdsdebug("send %p wr %p num_sge %u next %p\n", send,
                         &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
@@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        if (ic->i_flowctl && i < credit_alloc)
                rds_ib_send_add_credits(conn, credit_alloc - i);
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        /* XXX need to worry about failed_wr and partial sends. */
        failed_wr = &first->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                if (prev->s_op) {
                        ic->i_data_op = prev->s_op;
                        prev->s_op = NULL;
@@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
        u32 pos;
        u32 work_alloc;
        int ret;
+       int nr_sig = 0;
 
        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
@@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
                send->s_wr.wr.atomic.compare_add = op->op_swap_add;
                send->s_wr.wr.atomic.swap = 0;
        }
-       rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+       nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
        send->s_wr.num_sge = 1;
        send->s_wr.next = NULL;
        send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
        rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
                 send->s_sge[0].addr, send->s_sge[0].length);
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        failed_wr = &send->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
        rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
                printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                goto out;
        }
 
@@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
        int sent;
        int ret;
        int num_sge;
+       int nr_sig = 0;
 
        /* map the op the first time we see it */
        if (!op->op_mapped) {
@@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                send->s_queued = jiffies;
                send->s_op = NULL;
 
-               rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+               nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
                send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
                send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                work_alloc = i;
        }
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        failed_wr = &first->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
        rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                goto out;
        }