]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Thu, 19 Sep 2013 17:50:37 +0000 (12:50 -0500)
committerLinus Torvalds <torvalds@linux-foundation.org>
Thu, 19 Sep 2013 17:50:37 +0000 (12:50 -0500)
Pull ceph fixes from Sage Weil:
 "These fix several bugs with RBD from 3.11 that didn't get tested in
  time for the merge window: some error handling, a use-after-free, and
  a sequencing issue when unmapping and image races with a notify
  operation.

  There is also a patch fixing a problem with the new ceph + fscache
  code that just went in"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  fscache: check consistency does not decrement refcount
  rbd: fix error handling from rbd_snap_name()
  rbd: ignore unmapped snapshots that no longer exist
  rbd: fix use-after free of rbd_dev->disk
  rbd: make rbd_obj_notify_ack() synchronous
  rbd: complete notifies before cleaning up osd_client and rbd_dev
  libceph: add function to ensure notifies are complete

drivers/block/rbd.c
fs/fscache/cookie.c
include/linux/ceph/osd_client.h
net/ceph/osd_client.c

index b22a7d0fe5b72134c392a34d64b73e668f4485d4..cb1db2979d3d7b5417a8a4b131e09c5c5f6767c0 100644 (file)
@@ -931,12 +931,14 @@ static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
                                        u64 snap_id)
 {
        u32 which;
+       const char *snap_name;
 
        which = rbd_dev_snap_index(rbd_dev, snap_id);
        if (which == BAD_SNAP_INDEX)
-               return NULL;
+               return ERR_PTR(-ENOENT);
 
-       return _rbd_dev_v1_snap_name(rbd_dev, which);
+       snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
+       return snap_name ? snap_name : ERR_PTR(-ENOMEM);
 }
 
 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
@@ -2812,7 +2814,7 @@ out_err:
        obj_request_done_set(obj_request);
 }
 
-static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
+static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
 {
        struct rbd_obj_request *obj_request;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
@@ -2827,16 +2829,17 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
        obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
        if (!obj_request->osd_req)
                goto out;
-       obj_request->callback = rbd_obj_request_put;
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
                                        notify_id, 0, 0);
        rbd_osd_req_format_read(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
-out:
        if (ret)
-               rbd_obj_request_put(obj_request);
+               goto out;
+       ret = rbd_obj_request_wait(obj_request);
+out:
+       rbd_obj_request_put(obj_request);
 
        return ret;
 }
@@ -2856,7 +2859,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        if (ret)
                rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
 
-       rbd_obj_notify_ack(rbd_dev, notify_id);
+       rbd_obj_notify_ack_sync(rbd_dev, notify_id);
 }
 
 /*
@@ -3328,6 +3331,31 @@ static void rbd_exists_validate(struct rbd_device *rbd_dev)
                clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 }
 
+static void rbd_dev_update_size(struct rbd_device *rbd_dev)
+{
+       sector_t size;
+       bool removing;
+
+       /*
+        * Don't hold the lock while doing disk operations,
+        * or lock ordering will conflict with the bdev mutex via:
+        * rbd_add() -> blkdev_get() -> rbd_open()
+        */
+       spin_lock_irq(&rbd_dev->lock);
+       removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
+       spin_unlock_irq(&rbd_dev->lock);
+       /*
+        * If the device is being removed, rbd_dev->disk has
+        * been destroyed, so don't try to update its size
+        */
+       if (!removing) {
+               size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
+               dout("setting size to %llu sectors", (unsigned long long)size);
+               set_capacity(rbd_dev->disk, size);
+               revalidate_disk(rbd_dev->disk);
+       }
+}
+
 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
 {
        u64 mapping_size;
@@ -3347,12 +3375,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
        up_write(&rbd_dev->header_rwsem);
 
        if (mapping_size != rbd_dev->mapping.size) {
-               sector_t size;
-
-               size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
-               dout("setting size to %llu sectors", (unsigned long long)size);
-               set_capacity(rbd_dev->disk, size);
-               revalidate_disk(rbd_dev->disk);
+               rbd_dev_update_size(rbd_dev);
        }
 
        return ret;
@@ -4061,8 +4084,13 @@ static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
 
                snap_id = snapc->snaps[which];
                snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
-               if (IS_ERR(snap_name))
-                       break;
+               if (IS_ERR(snap_name)) {
+                       /* ignore no-longer existing snapshots */
+                       if (PTR_ERR(snap_name) == -ENOENT)
+                               continue;
+                       else
+                               break;
+               }
                found = !strcmp(name, snap_name);
                kfree(snap_name);
        }
@@ -4141,8 +4169,8 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
        /* Look up the snapshot name, and make a copy */
 
        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
-       if (!snap_name) {
-               ret = -ENOMEM;
+       if (IS_ERR(snap_name)) {
+               ret = PTR_ERR(snap_name);
                goto out_err;
        }
 
@@ -5163,10 +5191,23 @@ static ssize_t rbd_remove(struct bus_type *bus,
        if (ret < 0 || already)
                return ret;
 
-       rbd_bus_del_dev(rbd_dev);
        ret = rbd_dev_header_watch_sync(rbd_dev, false);
        if (ret)
                rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
+
+       /*
+        * flush remaining watch callbacks - these must be complete
+        * before the osd_client is shutdown
+        */
+       dout("%s: flushing notifies", __func__);
+       ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
+       /*
+        * Don't free anything from rbd_dev->disk until after all
+        * notifies are completely processed. Otherwise
+        * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
+        * in a potential use after free of rbd_dev->disk or rbd_dev.
+        */
+       rbd_bus_del_dev(rbd_dev);
        rbd_dev_image_release(rbd_dev);
        module_put(THIS_MODULE);
 
index 318e8433527c432984e61bf68594e305da3a840c..b2a86e324aac05f7bf64b7ce0d0e2c30d91f3d68 100644 (file)
@@ -586,7 +586,8 @@ int __fscache_check_consistency(struct fscache_cookie *cookie)
 
        fscache_operation_init(op, NULL, NULL);
        op->flags = FSCACHE_OP_MYTHREAD |
-               (1 << FSCACHE_OP_WAITING);
+               (1 << FSCACHE_OP_WAITING) |
+               (1 << FSCACHE_OP_UNUSE_COOKIE);
 
        spin_lock(&cookie->lock);
 
index ce6df39f60ff6f966144855ca0e1bdb04c312903..8f47625a06615dbf5cbc79ef5df2e2fd5cee6041 100644 (file)
@@ -335,6 +335,8 @@ extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
                                  struct ceph_osd_request *req);
 extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
 
+extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc);
+
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                               struct ceph_vino vino,
                               struct ceph_file_layout *layout,
index 1606f740d6ae0d1b7ea8aeb1ba8126fd7693888d..2b4b32aaa893b3117043e6a218fcde6c58f0aff4 100644 (file)
@@ -2215,6 +2215,17 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc)
 }
 EXPORT_SYMBOL(ceph_osdc_sync);
 
+/*
+ * Call all pending notify callbacks - for use after a watch is
+ * unregistered, to make sure no more callbacks for it will be invoked
+ */
+extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
+{
+       flush_workqueue(osdc->notify_wq);
+}
+EXPORT_SYMBOL(ceph_osdc_flush_notifies);
+
+
 /*
  * init, shutdown
  */