]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - net/rds/ib_rdma.c
Merge branch 'drm-core-next' of git://git.kernel.org/pub/scm/linux/kernel/git/airlied...
[karo-tx-linux.git] / net / rds / ib_rdma.c
index 242231f09464166fd3d9590952a2555c3fa654fc..18a833c450c88bde4d3ed180d907cc8156906ee2 100644 (file)
  */
 #include <linux/kernel.h>
 #include <linux/slab.h>
+#include <linux/rculist.h>
 
 #include "rds.h"
 #include "ib.h"
+#include "xlist.h"
 
+static struct workqueue_struct *rds_ib_fmr_wq;
+
+static DEFINE_PER_CPU(unsigned long, clean_list_grace);
+#define CLEAN_LIST_BUSY_BIT 0
 
 /*
  * This is stored as mr->r_trans_private.
@@ -44,7 +50,11 @@ struct rds_ib_mr {
        struct rds_ib_device    *device;
        struct rds_ib_mr_pool   *pool;
        struct ib_fmr           *fmr;
-       struct list_head        list;
+
+       struct xlist_head       xlist;
+
+       /* unmap_list is for freeing */
+       struct list_head        unmap_list;
        unsigned int            remap_count;
 
        struct scatterlist      *sg;
@@ -58,14 +68,16 @@ struct rds_ib_mr {
  */
 struct rds_ib_mr_pool {
        struct mutex            flush_lock;             /* serialize fmr invalidate */
-       struct work_struct      flush_worker;           /* flush worker */
+       struct delayed_work     flush_worker;           /* flush worker */
 
-       spinlock_t              list_lock;              /* protect variables below */
        atomic_t                item_count;             /* total # of MRs */
        atomic_t                dirty_count;            /* # dirty of MRs */
-       struct list_head        drop_list;              /* MRs that have reached their max_maps limit */
-       struct list_head        free_list;              /* unused MRs */
-       struct list_head        clean_list;             /* unused & unamapped MRs */
+
+       struct xlist_head       drop_list;              /* MRs that have reached their max_maps limit */
+       struct xlist_head       free_list;              /* unused MRs */
+       struct xlist_head       clean_list;             /* global unused & unamapped MRs */
+       wait_queue_head_t       flush_wait;
+
        atomic_t                free_pinned;            /* memory pinned by free MRs */
        unsigned long           max_items;
        unsigned long           max_items_soft;
@@ -73,7 +85,7 @@ struct rds_ib_mr_pool {
        struct ib_fmr_attr      fmr_attr;
 };
 
-static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
+static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **);
 static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
 static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
 
@@ -82,16 +94,17 @@ static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
        struct rds_ib_device *rds_ibdev;
        struct rds_ib_ipaddr *i_ipaddr;
 
-       list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
-               spin_lock_irq(&rds_ibdev->spinlock);
-               list_for_each_entry(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
+       rcu_read_lock();
+       list_for_each_entry_rcu(rds_ibdev, &rds_ib_devices, list) {
+               list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
                        if (i_ipaddr->ipaddr == ipaddr) {
-                               spin_unlock_irq(&rds_ibdev->spinlock);
+                               atomic_inc(&rds_ibdev->refcount);
+                               rcu_read_unlock();
                                return rds_ibdev;
                        }
                }
-               spin_unlock_irq(&rds_ibdev->spinlock);
        }
+       rcu_read_unlock();
 
        return NULL;
 }
@@ -107,7 +120,7 @@ static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
        i_ipaddr->ipaddr = ipaddr;
 
        spin_lock_irq(&rds_ibdev->spinlock);
-       list_add_tail(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
+       list_add_tail_rcu(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
        spin_unlock_irq(&rds_ibdev->spinlock);
 
        return 0;
@@ -115,17 +128,24 @@ static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
 
 static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
 {
-       struct rds_ib_ipaddr *i_ipaddr, *next;
+       struct rds_ib_ipaddr *i_ipaddr;
+       struct rds_ib_ipaddr *to_free = NULL;
+
 
        spin_lock_irq(&rds_ibdev->spinlock);
-       list_for_each_entry_safe(i_ipaddr, next, &rds_ibdev->ipaddr_list, list) {
+       list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
                if (i_ipaddr->ipaddr == ipaddr) {
-                       list_del(&i_ipaddr->list);
-                       kfree(i_ipaddr);
+                       list_del_rcu(&i_ipaddr->list);
+                       to_free = i_ipaddr;
                        break;
                }
        }
        spin_unlock_irq(&rds_ibdev->spinlock);
+
+       if (to_free) {
+               synchronize_rcu();
+               kfree(to_free);
+       }
 }
 
 int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
@@ -133,8 +153,10 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
        struct rds_ib_device *rds_ibdev_old;
 
        rds_ibdev_old = rds_ib_get_device(ipaddr);
-       if (rds_ibdev_old)
+       if (rds_ibdev_old) {
                rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
+               rds_ib_dev_put(rds_ibdev_old);
+       }
 
        return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
 }
@@ -149,12 +171,13 @@ void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *con
        BUG_ON(list_empty(&ic->ib_node));
        list_del(&ic->ib_node);
 
-       spin_lock_irq(&rds_ibdev->spinlock);
+       spin_lock(&rds_ibdev->spinlock);
        list_add_tail(&ic->ib_node, &rds_ibdev->conn_list);
-       spin_unlock_irq(&rds_ibdev->spinlock);
+       spin_unlock(&rds_ibdev->spinlock);
        spin_unlock_irq(&ib_nodev_conns_lock);
 
        ic->rds_ibdev = rds_ibdev;
+       atomic_inc(&rds_ibdev->refcount);
 }
 
 void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
@@ -174,18 +197,18 @@ void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *
        spin_unlock(&ib_nodev_conns_lock);
 
        ic->rds_ibdev = NULL;
+       rds_ib_dev_put(rds_ibdev);
 }
 
-void __rds_ib_destroy_conns(struct list_head *list, spinlock_t *list_lock)
+void rds_ib_destroy_nodev_conns(void)
 {
        struct rds_ib_connection *ic, *_ic;
        LIST_HEAD(tmp_list);
 
        /* avoid calling conn_destroy with irqs off */
-       spin_lock_irq(list_lock);
-       list_splice(list, &tmp_list);
-       INIT_LIST_HEAD(list);
-       spin_unlock_irq(list_lock);
+       spin_lock_irq(&ib_nodev_conns_lock);
+       list_splice(&ib_nodev_conns, &tmp_list);
+       spin_unlock_irq(&ib_nodev_conns_lock);
 
        list_for_each_entry_safe(ic, _ic, &tmp_list, ib_node)
                rds_conn_destroy(ic->conn);
@@ -199,12 +222,12 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
        if (!pool)
                return ERR_PTR(-ENOMEM);
 
-       INIT_LIST_HEAD(&pool->free_list);
-       INIT_LIST_HEAD(&pool->drop_list);
-       INIT_LIST_HEAD(&pool->clean_list);
+       INIT_XLIST_HEAD(&pool->free_list);
+       INIT_XLIST_HEAD(&pool->drop_list);
+       INIT_XLIST_HEAD(&pool->clean_list);
        mutex_init(&pool->flush_lock);
-       spin_lock_init(&pool->list_lock);
-       INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
+       init_waitqueue_head(&pool->flush_wait);
+       INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 
        pool->fmr_attr.max_pages = fmr_message_size;
        pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
@@ -232,34 +255,60 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
 
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
 {
-       flush_workqueue(rds_wq);
-       rds_ib_flush_mr_pool(pool, 1);
+       cancel_delayed_work_sync(&pool->flush_worker);
+       rds_ib_flush_mr_pool(pool, 1, NULL);
        WARN_ON(atomic_read(&pool->item_count));
        WARN_ON(atomic_read(&pool->free_pinned));
        kfree(pool);
 }
 
+static void refill_local(struct rds_ib_mr_pool *pool, struct xlist_head *xl,
+                        struct rds_ib_mr **ibmr_ret)
+{
+       struct xlist_head *ibmr_xl;
+       ibmr_xl = xlist_del_head_fast(xl);
+       *ibmr_ret = list_entry(ibmr_xl, struct rds_ib_mr, xlist);
+}
+
 static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
 {
        struct rds_ib_mr *ibmr = NULL;
-       unsigned long flags;
+       struct xlist_head *ret;
+       unsigned long *flag;
 
-       spin_lock_irqsave(&pool->list_lock, flags);
-       if (!list_empty(&pool->clean_list)) {
-               ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
-               list_del_init(&ibmr->list);
-       }
-       spin_unlock_irqrestore(&pool->list_lock, flags);
+       preempt_disable();
+       flag = &__get_cpu_var(clean_list_grace);
+       set_bit(CLEAN_LIST_BUSY_BIT, flag);
+       ret = xlist_del_head(&pool->clean_list);
+       if (ret)
+               ibmr = list_entry(ret, struct rds_ib_mr, xlist);
 
+       clear_bit(CLEAN_LIST_BUSY_BIT, flag);
+       preempt_enable();
        return ibmr;
 }
 
+static inline void wait_clean_list_grace(void)
+{
+       int cpu;
+       unsigned long *flag;
+
+       for_each_online_cpu(cpu) {
+               flag = &per_cpu(clean_list_grace, cpu);
+               while (test_bit(CLEAN_LIST_BUSY_BIT, flag))
+                       cpu_relax();
+       }
+}
+
 static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 {
        struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
        struct rds_ib_mr *ibmr = NULL;
        int err = 0, iter = 0;
 
+       if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
+               queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
+
        while (1) {
                ibmr = rds_ib_reuse_fmr(pool);
                if (ibmr)
@@ -286,21 +335,24 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 
                /* We do have some empty MRs. Flush them out. */
                rds_ib_stats_inc(s_ib_rdma_mr_pool_wait);
-               rds_ib_flush_mr_pool(pool, 0);
+               rds_ib_flush_mr_pool(pool, 0, &ibmr);
+               if (ibmr)
+                       return ibmr;
        }
 
-       ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL);
+       ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL, rdsibdev_to_node(rds_ibdev));
        if (!ibmr) {
                err = -ENOMEM;
                goto out_no_cigar;
        }
 
+       memset(ibmr, 0, sizeof(*ibmr));
+
        ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
                        (IB_ACCESS_LOCAL_WRITE |
                         IB_ACCESS_REMOTE_READ |
                         IB_ACCESS_REMOTE_WRITE|
                         IB_ACCESS_REMOTE_ATOMIC),
-
                        &pool->fmr_attr);
        if (IS_ERR(ibmr->fmr)) {
                err = PTR_ERR(ibmr->fmr);
@@ -368,7 +420,8 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
        if (page_cnt > fmr_message_size)
                return -EINVAL;
 
-       dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
+       dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
+                                rdsibdev_to_node(rds_ibdev));
        if (!dma_pages)
                return -ENOMEM;
 
@@ -477,34 +530,110 @@ static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int fr
        return 0;
 }
 
+/*
+ * given an xlist of mrs, put them all into the list_head for more processing
+ */
+static void xlist_append_to_list(struct xlist_head *xlist, struct list_head *list)
+{
+       struct rds_ib_mr *ibmr;
+       struct xlist_head splice;
+       struct xlist_head *cur;
+       struct xlist_head *next;
+
+       splice.next = NULL;
+       xlist_splice(xlist, &splice);
+       cur = splice.next;
+       while (cur) {
+               next = cur->next;
+               ibmr = list_entry(cur, struct rds_ib_mr, xlist);
+               list_add_tail(&ibmr->unmap_list, list);
+               cur = next;
+       }
+}
+
+/*
+ * this takes a list head of mrs and turns it into an xlist of clusters.
+ * each cluster has an xlist of MR_CLUSTER_SIZE mrs that are ready for
+ * reuse.
+ */
+static void list_append_to_xlist(struct rds_ib_mr_pool *pool,
+                               struct list_head *list, struct xlist_head *xlist,
+                               struct xlist_head **tail_ret)
+{
+       struct rds_ib_mr *ibmr;
+       struct xlist_head *cur_mr = xlist;
+       struct xlist_head *tail_mr = NULL;
+
+       list_for_each_entry(ibmr, list, unmap_list) {
+               tail_mr = &ibmr->xlist;
+               tail_mr->next = NULL;
+               cur_mr->next = tail_mr;
+               cur_mr = tail_mr;
+       }
+       *tail_ret = tail_mr;
+}
+
 /*
  * Flush our pool of MRs.
  * At a minimum, all currently unused MRs are unmapped.
  * If the number of MRs allocated exceeds the limit, we also try
  * to free as many MRs as needed to get back to this limit.
  */
-static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
+static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
+                               int free_all, struct rds_ib_mr **ibmr_ret)
 {
        struct rds_ib_mr *ibmr, *next;
+       struct xlist_head clean_xlist;
+       struct xlist_head *clean_tail;
        LIST_HEAD(unmap_list);
        LIST_HEAD(fmr_list);
        unsigned long unpinned = 0;
-       unsigned long flags;
        unsigned int nfreed = 0, ncleaned = 0, free_goal;
        int ret = 0;
 
        rds_ib_stats_inc(s_ib_rdma_mr_pool_flush);
 
-       mutex_lock(&pool->flush_lock);
+       if (ibmr_ret) {
+               DEFINE_WAIT(wait);
+               while(!mutex_trylock(&pool->flush_lock)) {
+                       ibmr = rds_ib_reuse_fmr(pool);
+                       if (ibmr) {
+                               *ibmr_ret = ibmr;
+                               finish_wait(&pool->flush_wait, &wait);
+                               goto out_nolock;
+                       }
+
+                       prepare_to_wait(&pool->flush_wait, &wait,
+                                       TASK_UNINTERRUPTIBLE);
+                       if (xlist_empty(&pool->clean_list))
+                               schedule();
+
+                       ibmr = rds_ib_reuse_fmr(pool);
+                       if (ibmr) {
+                               *ibmr_ret = ibmr;
+                               finish_wait(&pool->flush_wait, &wait);
+                               goto out_nolock;
+                       }
+               }
+               finish_wait(&pool->flush_wait, &wait);
+       } else
+               mutex_lock(&pool->flush_lock);
+
+       if (ibmr_ret) {
+               ibmr = rds_ib_reuse_fmr(pool);
+               if (ibmr) {
+                       *ibmr_ret = ibmr;
+                       goto out;
+               }
+       }
 
-       spin_lock_irqsave(&pool->list_lock, flags);
        /* Get the list of all MRs to be dropped. Ordering matters -
-        * we want to put drop_list ahead of free_list. */
-       list_splice_init(&pool->free_list, &unmap_list);
-       list_splice_init(&pool->drop_list, &unmap_list);
+        * we want to put drop_list ahead of free_list.
+        */
+       xlist_append_to_list(&pool->drop_list, &unmap_list);
+       xlist_append_to_list(&pool->free_list, &unmap_list);
        if (free_all)
-               list_splice_init(&pool->clean_list, &unmap_list);
-       spin_unlock_irqrestore(&pool->list_lock, flags);
+               xlist_append_to_list(&pool->clean_list, &unmap_list);
 
        free_goal = rds_ib_flush_goal(pool, free_all);
 
@@ -512,19 +641,20 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
                goto out;
 
        /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
-       list_for_each_entry(ibmr, &unmap_list, list)
+       list_for_each_entry(ibmr, &unmap_list, unmap_list)
                list_add(&ibmr->fmr->list, &fmr_list);
+
        ret = ib_unmap_fmr(&fmr_list);
        if (ret)
                printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
 
        /* Now we can destroy the DMA mapping and unpin any pages */
-       list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
+       list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
                unpinned += ibmr->sg_len;
                __rds_ib_teardown_mr(ibmr);
                if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
                        rds_ib_stats_inc(s_ib_rdma_mr_free);
-                       list_del(&ibmr->list);
+                       list_del(&ibmr->unmap_list);
                        ib_dealloc_fmr(ibmr->fmr);
                        kfree(ibmr);
                        nfreed++;
@@ -532,9 +662,27 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
                ncleaned++;
        }
 
-       spin_lock_irqsave(&pool->list_lock, flags);
-       list_splice(&unmap_list, &pool->clean_list);
-       spin_unlock_irqrestore(&pool->list_lock, flags);
+       if (!list_empty(&unmap_list)) {
+               /* we have to make sure that none of the things we're about
+                * to put on the clean list would race with other cpus trying
+                * to pull items off.  The xlist would explode if we managed to
+                * remove something from the clean list and then add it back again
+                * while another CPU was spinning on that same item in xlist_del_head.
+                *
+                * This is pretty unlikely, but just in case  wait for an xlist grace period
+                * here before adding anything back into the clean list.
+                */
+               wait_clean_list_grace();
+
+               list_append_to_xlist(pool, &unmap_list, &clean_xlist, &clean_tail);
+               if (ibmr_ret)
+                       refill_local(pool, &clean_xlist, ibmr_ret);
+
+               /* refill_local may have emptied our list */
+               if (!xlist_empty(&clean_xlist))
+                       xlist_add(clean_xlist.next, clean_tail, &pool->clean_list);
+
+       }
 
        atomic_sub(unpinned, &pool->free_pinned);
        atomic_sub(ncleaned, &pool->dirty_count);
@@ -542,14 +690,35 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
 
 out:
        mutex_unlock(&pool->flush_lock);
+       if (waitqueue_active(&pool->flush_wait))
+               wake_up(&pool->flush_wait);
+out_nolock:
        return ret;
 }
 
+int rds_ib_fmr_init(void)
+{
+       rds_ib_fmr_wq = create_workqueue("rds_fmr_flushd");
+       if (!rds_ib_fmr_wq)
+               return -ENOMEM;
+       return 0;
+}
+
+/*
+ * By the time this is called all the IB devices should have been torn down and
+ * had their pools freed.  As each pool is freed its work struct is waited on,
+ * so the pool flushing work queue should be idle by the time we get here.
+ */
+void rds_ib_fmr_exit(void)
+{
+       destroy_workqueue(rds_ib_fmr_wq);
+}
+
 static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
 {
-       struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker);
+       struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker.work);
 
-       rds_ib_flush_mr_pool(pool, 0);
+       rds_ib_flush_mr_pool(pool, 0, NULL);
 }
 
 void rds_ib_free_mr(void *trans_private, int invalidate)
@@ -557,47 +726,49 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
        struct rds_ib_mr *ibmr = trans_private;
        struct rds_ib_device *rds_ibdev = ibmr->device;
        struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
-       unsigned long flags;
 
        rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
 
        /* Return it to the pool's free list */
-       spin_lock_irqsave(&pool->list_lock, flags);
        if (ibmr->remap_count >= pool->fmr_attr.max_maps)
-               list_add(&ibmr->list, &pool->drop_list);
+               xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->drop_list);
        else
-               list_add(&ibmr->list, &pool->free_list);
+               xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->free_list);
 
        atomic_add(ibmr->sg_len, &pool->free_pinned);
        atomic_inc(&pool->dirty_count);
-       spin_unlock_irqrestore(&pool->list_lock, flags);
 
        /* If we've pinned too many pages, request a flush */
        if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
            atomic_read(&pool->dirty_count) >= pool->max_items / 10)
-               queue_work(rds_wq, &pool->flush_worker);
+               queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
 
        if (invalidate) {
                if (likely(!in_interrupt())) {
-                       rds_ib_flush_mr_pool(pool, 0);
+                       rds_ib_flush_mr_pool(pool, 0, NULL);
                } else {
                        /* We get here if the user created a MR marked
                         * as use_once and invalidate at the same time. */
-                       queue_work(rds_wq, &pool->flush_worker);
+                       queue_delayed_work(rds_ib_fmr_wq,
+                                          &pool->flush_worker, 10);
                }
        }
+
+       rds_ib_dev_put(rds_ibdev);
 }
 
 void rds_ib_flush_mrs(void)
 {
        struct rds_ib_device *rds_ibdev;
 
+       down_read(&rds_ib_devices_lock);
        list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
                struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
 
                if (pool)
-                       rds_ib_flush_mr_pool(pool, 0);
+                       rds_ib_flush_mr_pool(pool, 0, NULL);
        }
+       up_read(&rds_ib_devices_lock);
 }
 
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
@@ -629,6 +800,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
                printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
 
        ibmr->device = rds_ibdev;
+       rds_ibdev = NULL;
 
  out:
        if (ret) {
@@ -636,5 +808,8 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
                        rds_ib_free_mr(ibmr, 0);
                ibmr = ERR_PTR(ret);
        }
+       if (rds_ibdev)
+               rds_ib_dev_put(rds_ibdev);
        return ibmr;
 }
+