]> git.karo-electronics.de Git - mv-sheeva.git/blobdiff - net/rds/connection.c
Merge branch 'master' into csb1725
[mv-sheeva.git] / net / rds / connection.c
index 7619b671ca2829f0e197a93f6abd06df2f108229..9334d892366ebb16091bd18fbdaacc9697eabb2a 100644 (file)
@@ -37,7 +37,6 @@
 
 #include "rds.h"
 #include "loop.h"
-#include "rdma.h"
 
 #define RDS_CONNECTION_HASH_BITS 12
 #define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
@@ -63,18 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
                var |= RDS_INFO_CONNECTION_FLAG_##suffix;       \
 } while (0)
 
-static inline int rds_conn_is_sending(struct rds_connection *conn)
-{
-       int ret = 0;
-
-       if (!mutex_trylock(&conn->c_send_lock))
-               ret = 1;
-       else
-               mutex_unlock(&conn->c_send_lock);
-
-       return ret;
-}
-
+/* rcu read lock must be held or the connection spinlock */
 static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
                                              __be32 laddr, __be32 faddr,
                                              struct rds_transport *trans)
@@ -82,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
        struct rds_connection *conn, *ret = NULL;
        struct hlist_node *pos;
 
-       hlist_for_each_entry(conn, pos, head, c_hash_node) {
+       hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
                if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
                                conn->c_trans == trans) {
                        ret = conn;
@@ -100,7 +88,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
  * and receiving over this connection again in the future.  It is up to
  * the transport to have serialized this call with its send and recv.
  */
-void rds_conn_reset(struct rds_connection *conn)
+static void rds_conn_reset(struct rds_connection *conn)
 {
        rdsdebug("connection %pI4 to %pI4 reset\n",
          &conn->c_laddr, &conn->c_faddr);
@@ -129,10 +117,11 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
 {
        struct rds_connection *conn, *parent = NULL;
        struct hlist_head *head = rds_conn_bucket(laddr, faddr);
+       struct rds_transport *loop_trans;
        unsigned long flags;
        int ret;
 
-       spin_lock_irqsave(&rds_conn_lock, flags);
+       rcu_read_lock();
        conn = rds_conn_lookup(head, laddr, faddr, trans);
        if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
            !is_outgoing) {
@@ -143,12 +132,12 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
                parent = conn;
                conn = parent->c_passive;
        }
-       spin_unlock_irqrestore(&rds_conn_lock, flags);
+       rcu_read_unlock();
        if (conn)
                goto out;
 
        conn = kmem_cache_zalloc(rds_conn_slab, gfp);
-       if (conn == NULL) {
+       if (!conn) {
                conn = ERR_PTR(-ENOMEM);
                goto out;
        }
@@ -159,7 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
        spin_lock_init(&conn->c_lock);
        conn->c_next_tx_seq = 1;
 
-       mutex_init(&conn->c_send_lock);
+       init_waitqueue_head(&conn->c_waitq);
        INIT_LIST_HEAD(&conn->c_send_queue);
        INIT_LIST_HEAD(&conn->c_retrans);
 
@@ -175,7 +164,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
         * can bind to the destination address then we'd rather the messages
         * flow through loopback rather than either transport.
         */
-       if (rds_trans_get_preferred(faddr)) {
+       loop_trans = rds_trans_get_preferred(faddr);
+       if (loop_trans) {
+               rds_trans_put(loop_trans);
                conn->c_loopback = 1;
                if (is_outgoing && trans->t_prefer_loopback) {
                        /* "outgoing" connection - and the transport
@@ -238,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
                        kmem_cache_free(rds_conn_slab, conn);
                        conn = found;
                } else {
-                       hlist_add_head(&conn->c_hash_node, head);
+                       hlist_add_head_rcu(&conn->c_hash_node, head);
                        rds_cong_add_conn(conn);
                        rds_conn_count++;
                }
@@ -263,21 +254,91 @@ struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
 }
 EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
 
+void rds_conn_shutdown(struct rds_connection *conn)
+{
+       /* shut it down unless it's down already */
+       if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
+               /*
+                * Quiesce the connection mgmt handlers before we start tearing
+                * things down. We don't hold the mutex for the entire
+                * duration of the shutdown operation, else we may be
+                * deadlocking with the CM handler. Instead, the CM event
+                * handler is supposed to check for state DISCONNECTING
+                */
+               mutex_lock(&conn->c_cm_lock);
+               if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
+                && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
+                       rds_conn_error(conn, "shutdown called in state %d\n",
+                                       atomic_read(&conn->c_state));
+                       mutex_unlock(&conn->c_cm_lock);
+                       return;
+               }
+               mutex_unlock(&conn->c_cm_lock);
+
+               wait_event(conn->c_waitq,
+                          !test_bit(RDS_IN_XMIT, &conn->c_flags));
+
+               conn->c_trans->conn_shutdown(conn);
+               rds_conn_reset(conn);
+
+               if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
+                       /* This can happen - eg when we're in the middle of tearing
+                        * down the connection, and someone unloads the rds module.
+                        * Quite reproduceable with loopback connections.
+                        * Mostly harmless.
+                        */
+                       rds_conn_error(conn,
+                               "%s: failed to transition to state DOWN, "
+                               "current state is %d\n",
+                               __func__,
+                               atomic_read(&conn->c_state));
+                       return;
+               }
+       }
+
+       /* Then reconnect if it's still live.
+        * The passive side of an IB loopback connection is never added
+        * to the conn hash, so we never trigger a reconnect on this
+        * conn - the reconnect is always triggered by the active peer. */
+       cancel_delayed_work_sync(&conn->c_conn_w);
+       rcu_read_lock();
+       if (!hlist_unhashed(&conn->c_hash_node)) {
+               rcu_read_unlock();
+               rds_queue_reconnect(conn);
+       } else {
+               rcu_read_unlock();
+       }
+}
+
+/*
+ * Stop and free a connection.
+ *
+ * This can only be used in very limited circumstances.  It assumes that once
+ * the conn has been shutdown that no one else is referencing the connection.
+ * We can only ensure this in the rmmod path in the current code.
+ */
 void rds_conn_destroy(struct rds_connection *conn)
 {
        struct rds_message *rm, *rtmp;
+       unsigned long flags;
 
        rdsdebug("freeing conn %p for %pI4 -> "
                 "%pI4\n", conn, &conn->c_laddr,
                 &conn->c_faddr);
 
-       hlist_del_init(&conn->c_hash_node);
+       /* Ensure conn will not be scheduled for reconnect */
+       spin_lock_irq(&rds_conn_lock);
+       hlist_del_init_rcu(&conn->c_hash_node);
+       spin_unlock_irq(&rds_conn_lock);
+       synchronize_rcu();
 
-       /* wait for the rds thread to shut it down */
-       atomic_set(&conn->c_state, RDS_CONN_ERROR);
-       cancel_delayed_work(&conn->c_conn_w);
-       queue_work(rds_wq, &conn->c_down_w);
-       flush_workqueue(rds_wq);
+       /* shut the connection down */
+       rds_conn_drop(conn);
+       flush_work(&conn->c_down_w);
+
+       /* make sure lingering queued work won't try to ref the conn */
+       cancel_delayed_work_sync(&conn->c_send_w);
+       cancel_delayed_work_sync(&conn->c_recv_w);
 
        /* tear down queued messages */
        list_for_each_entry_safe(rm, rtmp,
@@ -302,7 +363,9 @@ void rds_conn_destroy(struct rds_connection *conn)
        BUG_ON(!list_empty(&conn->c_retrans));
        kmem_cache_free(rds_conn_slab, conn);
 
+       spin_lock_irqsave(&rds_conn_lock, flags);
        rds_conn_count--;
+       spin_unlock_irqrestore(&rds_conn_lock, flags);
 }
 EXPORT_SYMBOL_GPL(rds_conn_destroy);
 
@@ -316,23 +379,23 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
        struct list_head *list;
        struct rds_connection *conn;
        struct rds_message *rm;
-       unsigned long flags;
        unsigned int total = 0;
+       unsigned long flags;
        size_t i;
 
        len /= sizeof(struct rds_info_message);
 
-       spin_lock_irqsave(&rds_conn_lock, flags);
+       rcu_read_lock();
 
        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
             i++, head++) {
-               hlist_for_each_entry(conn, pos, head, c_hash_node) {
+               hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
                        if (want_send)
                                list = &conn->c_send_queue;
                        else
                                list = &conn->c_retrans;
 
-                       spin_lock(&conn->c_lock);
+                       spin_lock_irqsave(&conn->c_lock, flags);
 
                        /* XXX too lazy to maintain counts.. */
                        list_for_each_entry(rm, list, m_conn_item) {
@@ -343,11 +406,10 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
                                                          conn->c_faddr, 0);
                        }
 
-                       spin_unlock(&conn->c_lock);
+                       spin_unlock_irqrestore(&conn->c_lock, flags);
                }
        }
-
-       spin_unlock_irqrestore(&rds_conn_lock, flags);
+       rcu_read_unlock();
 
        lens->nr = total;
        lens->each = sizeof(struct rds_info_message);
@@ -377,19 +439,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
        uint64_t buffer[(item_len + 7) / 8];
        struct hlist_head *head;
        struct hlist_node *pos;
-       struct hlist_node *tmp;
        struct rds_connection *conn;
-       unsigned long flags;
        size_t i;
 
-       spin_lock_irqsave(&rds_conn_lock, flags);
+       rcu_read_lock();
 
        lens->nr = 0;
        lens->each = item_len;
 
        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
             i++, head++) {
-               hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) {
+               hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
 
                        /* XXX no c_lock usage.. */
                        if (!visitor(conn, buffer))
@@ -405,8 +465,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
                        lens->nr++;
                }
        }
-
-       spin_unlock_irqrestore(&rds_conn_lock, flags);
+       rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
 
@@ -423,8 +482,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
                sizeof(cinfo->transport));
        cinfo->flags = 0;
 
-       rds_conn_info_set(cinfo->flags,
-                         rds_conn_is_sending(conn), SENDING);
+       rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+                         SENDING);
        /* XXX Future: return the state rather than these funky bits */
        rds_conn_info_set(cinfo->flags,
                          atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
@@ -444,12 +503,12 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
                                sizeof(struct rds_info_connection));
 }
 
-int __init rds_conn_init(void)
+int rds_conn_init(void)
 {
        rds_conn_slab = kmem_cache_create("rds_connection",
                                          sizeof(struct rds_connection),
                                          0, 0, NULL);
-       if (rds_conn_slab == NULL)
+       if (!rds_conn_slab)
                return -ENOMEM;
 
        rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
@@ -486,6 +545,18 @@ void rds_conn_drop(struct rds_connection *conn)
 }
 EXPORT_SYMBOL_GPL(rds_conn_drop);
 
+/*
+ * If the connection is down, trigger a connect. We may have scheduled a
+ * delayed reconnect however - in this case we should not interfere.
+ */
+void rds_conn_connect_if_down(struct rds_connection *conn)
+{
+       if (rds_conn_state(conn) == RDS_CONN_DOWN &&
+           !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
+               queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+}
+EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
+
 /*
  * An error occurred on the connection
  */