]> git.karo-electronics.de Git - mv-sheeva.git/blobdiff - net/rds/ib_send.c
rds: Fix reference counting on the for xmit_atomic and xmit_rdma
[mv-sheeva.git] / net / rds / ib_send.c
index 95f15247acd72918737b2e7f17e698154e3254a8..209dbc6d159d76ec2daee343983d21ef515a1bdf 100644 (file)
@@ -67,80 +67,122 @@ static void rds_ib_send_complete(struct rds_message *rm,
        complete(rm, notify_status);
 }
 
-static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
-                         struct rds_ib_send_work *send,
-                         int wc_status)
+static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
+                                  struct rm_data_op *op,
+                                  int wc_status)
 {
-       struct rds_message *rm = send->s_rm;
-
-       rdsdebug("ic %p send %p rm %p\n", ic, send, rm);
-
-       ib_dma_unmap_sg(ic->i_cm_id->device,
-                       rm->data.op_sg, rm->data.op_nents,
-                       DMA_TO_DEVICE);
+       if (op->op_nents)
+               ib_dma_unmap_sg(ic->i_cm_id->device,
+                               op->op_sg, op->op_nents,
+                               DMA_TO_DEVICE);
+}
 
-       if (rm->rdma.op_active) {
-               struct rm_rdma_op *op = &rm->rdma;
+static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
+                                  struct rm_rdma_op *op,
+                                  int wc_status)
+{
+       if (op->op_mapped) {
+               ib_dma_unmap_sg(ic->i_cm_id->device,
+                               op->op_sg, op->op_nents,
+                               op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
+               op->op_mapped = 0;
+       }
 
-               if (op->op_mapped) {
-                       ib_dma_unmap_sg(ic->i_cm_id->device,
-                                       op->op_sg, op->op_nents,
-                                       op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
-                       op->op_mapped = 0;
-               }
+       /* If the user asked for a completion notification on this
+        * message, we can implement three different semantics:
+        *  1.  Notify when we received the ACK on the RDS message
+        *      that was queued with the RDMA. This provides reliable
+        *      notification of RDMA status at the expense of a one-way
+        *      packet delay.
+        *  2.  Notify when the IB stack gives us the completion event for
+        *      the RDMA operation.
+        *  3.  Notify when the IB stack gives us the completion event for
+        *      the accompanying RDS messages.
+        * Here, we implement approach #3. To implement approach #2,
+        * we would need to take an event for the rdma WR. To implement #1,
+        * don't call rds_rdma_send_complete at all, and fall back to the notify
+        * handling in the ACK processing code.
+        *
+        * Note: There's no need to explicitly sync any RDMA buffers using
+        * ib_dma_sync_sg_for_cpu - the completion for the RDMA
+        * operation itself unmapped the RDMA buffers, which takes care
+        * of synching.
+        */
+       rds_ib_send_complete(container_of(op, struct rds_message, rdma),
+                            wc_status, rds_rdma_send_complete);
 
-               /* If the user asked for a completion notification on this
-                * message, we can implement three different semantics:
-                *  1.  Notify when we received the ACK on the RDS message
-                *      that was queued with the RDMA. This provides reliable
-                *      notification of RDMA status at the expense of a one-way
-                *      packet delay.
-                *  2.  Notify when the IB stack gives us the completion event for
-                *      the RDMA operation.
-                *  3.  Notify when the IB stack gives us the completion event for
-                *      the accompanying RDS messages.
-                * Here, we implement approach #3. To implement approach #2,
-                * call rds_rdma_send_complete from the cq_handler. To implement #1,
-                * don't call rds_rdma_send_complete at all, and fall back to the notify
-                * handling in the ACK processing code.
-                *
-                * Note: There's no need to explicitly sync any RDMA buffers using
-                * ib_dma_sync_sg_for_cpu - the completion for the RDMA
-                * operation itself unmapped the RDMA buffers, which takes care
-                * of synching.
-                */
-               rds_ib_send_complete(rm, wc_status, rds_rdma_send_complete);
+       if (op->op_write)
+               rds_stats_add(s_send_rdma_bytes, op->op_bytes);
+       else
+               rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
+}
 
-               if (rm->rdma.op_write)
-                       rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes);
-               else
-                       rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes);
+static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
+                                    struct rm_atomic_op *op,
+                                    int wc_status)
+{
+       /* unmap atomic recvbuf */
+       if (op->op_mapped) {
+               ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
+                               DMA_FROM_DEVICE);
+               op->op_mapped = 0;
        }
 
-       if (rm->atomic.op_active) {
-               struct rm_atomic_op *op = &rm->atomic;
+       rds_ib_send_complete(container_of(op, struct rds_message, atomic),
+                            wc_status, rds_atomic_send_complete);
 
-               /* unmap atomic recvbuf */
-               if (op->op_mapped) {
-                       ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
-                                       DMA_FROM_DEVICE);
-                       op->op_mapped = 0;
-               }
-
-               rds_ib_send_complete(rm, wc_status, rds_atomic_send_complete);
+       if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
+               rds_ib_stats_inc(s_ib_atomic_cswp);
+       else
+               rds_ib_stats_inc(s_ib_atomic_fadd);
+}
 
-               if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP)
-                       rds_stats_inc(s_atomic_cswp);
-               else
-                       rds_stats_inc(s_atomic_fadd);
+/*
+ * Unmap the resources associated with a struct send_work.
+ *
+ * Returns the rm for no good reason other than it is unobtainable
+ * other than by switching on wr.opcode, currently, and the caller,
+ * the event handler, needs it.
+ */
+static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
+                                               struct rds_ib_send_work *send,
+                                               int wc_status)
+{
+       struct rds_message *rm = NULL;
+
+       /* In the error case, wc.opcode sometimes contains garbage */
+       switch (send->s_wr.opcode) {
+       case IB_WR_SEND:
+               if (send->s_op) {
+                       rm = container_of(send->s_op, struct rds_message, data);
+                       rds_ib_send_unmap_data(ic, send->s_op, wc_status);
+               }
+               break;
+       case IB_WR_RDMA_WRITE:
+       case IB_WR_RDMA_READ:
+               if (send->s_op) {
+                       rm = container_of(send->s_op, struct rds_message, rdma);
+                       rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
+               }
+               break;
+       case IB_WR_ATOMIC_FETCH_AND_ADD:
+       case IB_WR_ATOMIC_CMP_AND_SWP:
+               if (send->s_op) {
+                       rm = container_of(send->s_op, struct rds_message, atomic);
+                       rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
+               }
+               break;
+       default:
+               if (printk_ratelimit())
+                       printk(KERN_NOTICE
+                              "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
+                              __func__, send->s_wr.opcode);
+               break;
        }
 
-       /* If anyone waited for this message to get flushed out, wake
-        * them up now */
-       rds_message_unmapped(rm);
+       send->s_wr.opcode = 0xdead;
 
-       rds_message_put(rm);
-       send->s_rm = NULL;
+       return rm;
 }
 
 void rds_ib_send_init_ring(struct rds_ib_connection *ic)
@@ -151,7 +193,6 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
        for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
                struct ib_sge *sge;
 
-               send->s_rm = NULL;
                send->s_op = NULL;
 
                send->s_wr.wr_id = i;
@@ -173,9 +214,8 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
        u32 i;
 
        for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
-               if (!send->s_rm || send->s_wr.opcode == 0xdead)
-                       continue;
-               rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
+               if (send->s_op && send->s_wr.opcode != 0xdead)
+                       rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
        }
 }
 
@@ -189,6 +229,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 {
        struct rds_connection *conn = context;
        struct rds_ib_connection *ic = conn->c_transport_data;
+       struct rds_message *rm = NULL;
        struct ib_wc wc;
        struct rds_ib_send_work *send;
        u32 completed;
@@ -222,42 +263,19 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
                for (i = 0; i < completed; i++) {
                        send = &ic->i_sends[oldest];
 
-                       /* In the error case, wc.opcode sometimes contains garbage */
-                       switch (send->s_wr.opcode) {
-                       case IB_WR_SEND:
-                       case IB_WR_RDMA_WRITE:
-                       case IB_WR_RDMA_READ:
-                       case IB_WR_ATOMIC_FETCH_AND_ADD:
-                       case IB_WR_ATOMIC_CMP_AND_SWP:
-                               if (send->s_rm)
-                                       rds_ib_send_unmap_rm(ic, send, wc.status);
-                               break;
-                       default:
-                               if (printk_ratelimit())
-                                       printk(KERN_NOTICE
-                                               "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
-                                               __func__, send->s_wr.opcode);
-                               break;
-                       }
+                       rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
-                       send->s_wr.opcode = 0xdead;
-                       send->s_wr.num_sge = 1;
                        if (send->s_queued + HZ/2 < jiffies)
                                rds_ib_stats_inc(s_ib_tx_stalled);
 
-                       /* If a RDMA operation produced an error, signal this right
-                        * away. If we don't, the subsequent SEND that goes with this
-                        * RDMA will be canceled with ERR_WFLUSH, and the application
-                        * never learn that the RDMA failed. */
-                       if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) {
-                               struct rds_message *rm;
-
-                               rm = rds_send_get_message(conn, send->s_op);
-                               if (rm) {
-                                       rds_ib_send_unmap_rm(ic, send, wc.status);
-                                       rds_ib_send_complete(rm, wc.status, rds_rdma_send_complete);
-                                       rds_message_put(rm);
+                       if (send->s_op) {
+                               if (send->s_op == rm->m_final_op) {
+                                       /* If anyone waited for this message to get flushed out, wake
+                                        * them up now */
+                                       rds_message_unmapped(rm);
                                }
+                               rds_message_put(rm);
+                               send->s_op = NULL;
                        }
 
                        oldest = (oldest + 1) % ic->i_send_ring.w_nr;
@@ -512,7 +530,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        }
 
        /* map the message the first time we see it */
-       if (!ic->i_rm) {
+       if (!ic->i_data_op) {
                if (rm->data.op_nents) {
                        rm->data.op_count = ib_dma_map_sg(dev,
                                                          rm->data.op_sg,
@@ -530,7 +548,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                }
 
                rds_message_addref(rm);
-               ic->i_rm = rm;
+               ic->i_data_op = &rm->data;
 
                /* Finalize the header */
                if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
@@ -583,7 +601,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        send = &ic->i_sends[pos];
        first = send;
        prev = NULL;
-       scat = &rm->data.op_sg[sg];
+       scat = &ic->i_data_op->op_sg[sg];
        i = 0;
        do {
                unsigned int len = 0;
@@ -658,9 +676,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 
        /* if we finished the message then send completion owns it */
        if (scat == &rm->data.op_sg[rm->data.op_count]) {
-               prev->s_rm = ic->i_rm;
+               prev->s_op = ic->i_data_op;
                prev->s_wr.send_flags |= IB_SEND_SOLICITED;
-               ic->i_rm = NULL;
+               ic->i_data_op = NULL;
        }
 
        /* Put back wrs & credits we didn't use */
@@ -681,9 +699,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
-               if (prev->s_rm) {
-                       ic->i_rm = prev->s_rm;
-                       prev->s_rm = NULL;
+               if (prev->s_op) {
+                       ic->i_data_op = prev->s_op;
+                       prev->s_op = NULL;
                }
 
                rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
@@ -701,10 +719,9 @@ out:
  * A simplified version of the rdma case, we always map 1 SG, and
  * only 8 bytes, for the return value from the atomic operation.
  */
-int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 {
        struct rds_ib_connection *ic = conn->c_transport_data;
-       struct rm_atomic_op *op = &rm->atomic;
        struct rds_ib_send_work *send = NULL;
        struct ib_send_wr *failed_wr;
        struct rds_ib_device *rds_ibdev;
@@ -740,14 +757,8 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
        send->s_wr.next = NULL;
        send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
        send->s_wr.wr.atomic.rkey = op->op_rkey;
-
-       /*
-        * If there is no data or rdma ops in the message, then
-        * we must fill in s_rm ourselves, so we properly clean up
-        * on completion.
-        */
-       if (!rm->rdma.op_active && !rm->data.op_active)
-               send->s_rm = rm;
+       send->s_op = op;
+       rds_message_addref(container_of(send->s_op, struct rds_message, atomic));
 
        /* map 8 byte retval buffer to the device */
        ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
@@ -809,7 +820,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 
        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
-       /* map the message the first time we see it */
+       /* map the op the first time we see it */
        if (!op->op_mapped) {
                op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
                                             op->op_sg, op->op_nents, (op->op_write) ?
@@ -848,13 +859,13 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
        for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {
                send->s_wr.send_flags = 0;
                send->s_queued = jiffies;
+               send->s_op = NULL;
 
                rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
                send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
                send->s_wr.wr.rdma.remote_addr = remote_addr;
                send->s_wr.wr.rdma.rkey = op->op_rkey;
-               send->s_op = op;
 
                if (num_sge > rds_ibdev->max_sge) {
                        send->s_wr.num_sge = rds_ibdev->max_sge;
@@ -890,6 +901,12 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                        send = ic->i_sends;
        }
 
+       /* give a reference to the last op */
+       if (scat == &op->op_sg[op->op_count]) {
+               prev->s_op = op;
+               rds_message_addref(container_of(op, struct rds_message, rdma));
+       }
+
        if (i < work_alloc) {
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
                work_alloc = i;