]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - drivers/block/rbd.c
rbd: issue a copyup for layered writes
[karo-tx-linux.git] / drivers / block / rbd.c
index b7b7a88d9f689cdd3697ac8074de1887e7d50c15..c34719c917b1cb8a710fc0b9be12c2e3bfa6ae1d 100644 (file)
 #define        SECTOR_SHIFT    9
 #define        SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
 
-/* It might be useful to have these defined elsewhere */
-
-#define        U8_MAX  ((u8)   (~0U))
-#define        U16_MAX ((u16)  (~0U))
-#define        U32_MAX ((u32)  (~0U))
-#define        U64_MAX ((u64)  (~0ULL))
-
 #define RBD_DRV_NAME "rbd"
 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
 
 
 /* Feature bits */
 
-#define RBD_FEATURE_LAYERING      1
+#define RBD_FEATURE_LAYERING   (1<<0)
+#define RBD_FEATURE_STRIPINGV2 (1<<1)
+#define RBD_FEATURES_ALL \
+           (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
 
 /* Features supported by this (client software) implementation. */
 
-#define RBD_FEATURES_ALL          (0)
+#define RBD_FEATURES_SUPPORTED (0)
 
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
@@ -174,13 +170,44 @@ enum obj_request_type {
        OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
 };
 
+enum obj_req_flags {
+       OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
+       OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
+       OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
+       OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
+};
+
 struct rbd_obj_request {
        const char              *object_name;
        u64                     offset;         /* object start byte */
        u64                     length;         /* bytes from offset */
+       unsigned long           flags;
 
-       struct rbd_img_request  *img_request;
-       struct list_head        links;          /* img_request->obj_requests */
+       /*
+        * An object request associated with an image will have its
+        * img_data flag set; a standalone object request will not.
+        *
+        * A standalone object request will have which == BAD_WHICH
+        * and a null obj_request pointer.
+        *
+        * An object request initiated in support of a layered image
+        * object (to check for its existence before a write) will
+        * have which == BAD_WHICH and a non-null obj_request pointer.
+        *
+        * Finally, an object request for rbd image data will have
+        * which != BAD_WHICH, and will have a non-null img_request
+        * pointer.  The value of which will be in the range
+        * 0..(img_request->obj_request_count-1).
+        */
+       union {
+               struct rbd_obj_request  *obj_request;   /* STAT op */
+               struct {
+                       struct rbd_img_request  *img_request;
+                       u64                     img_offset;
+                       /* links for img_request->obj_requests list */
+                       struct list_head        links;
+               };
+       };
        u32                     which;          /* posn image request list */
 
        enum obj_request_type   type;
@@ -191,13 +218,13 @@ struct rbd_obj_request {
                        u32             page_count;
                };
        };
+       struct page             **copyup_pages;
 
        struct ceph_osd_request *osd_req;
 
        u64                     xferred;        /* bytes transferred */
        u64                     version;
        int                     result;
-       atomic_t                done;
 
        rbd_obj_callback_t      callback;
        struct completion       completion;
@@ -205,19 +232,31 @@ struct rbd_obj_request {
        struct kref             kref;
 };
 
+enum img_req_flags {
+       IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
+       IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
+       IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
+};
+
 struct rbd_img_request {
-       struct request          *rq;
        struct rbd_device       *rbd_dev;
        u64                     offset; /* starting image byte offset */
        u64                     length; /* byte count from offset */
-       bool                    write_request;  /* false for read */
+       unsigned long           flags;
        union {
+               u64                     snap_id;        /* for reads */
                struct ceph_snap_context *snapc;        /* for writes */
-               u64             snap_id;                /* for reads */
        };
+       union {
+               struct request          *rq;            /* block request */
+               struct rbd_obj_request  *obj_request;   /* obj req initiator */
+       };
+       struct page             **copyup_pages;
        spinlock_t              completion_lock;/* protects next_completion */
        u32                     next_completion;
        rbd_img_callback_t      callback;
+       u64                     xferred;/* aggregate bytes transferred */
+       int                     result; /* first nonzero obj_request result */
 
        u32                     obj_request_count;
        struct list_head        obj_requests;   /* rbd_obj_request structs */
@@ -276,6 +315,7 @@ struct rbd_device {
 
        struct rbd_spec         *parent_spec;
        u64                     parent_overlap;
+       struct rbd_device       *parent;
 
        /* protects updating the header */
        struct rw_semaphore     header_rwsem;
@@ -312,6 +352,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
 static LIST_HEAD(rbd_client_list);             /* clients */
 static DEFINE_SPINLOCK(rbd_client_list_lock);
 
+static int rbd_img_request_submit(struct rbd_img_request *img_request);
+
 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
 
@@ -322,6 +364,7 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
                          size_t count);
+static int rbd_dev_probe(struct rbd_device *rbd_dev);
 
 static struct bus_attribute rbd_bus_attrs[] = {
        __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -383,6 +426,9 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 #  define rbd_assert(expr)     ((void) 0)
 #endif /* !RBD_DEBUG */
 
+static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
+static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
+
 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 
@@ -484,6 +530,13 @@ out_opt:
        return ERR_PTR(ret);
 }
 
+static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
+{
+       kref_get(&rbdc->kref);
+
+       return rbdc;
+}
+
 /*
  * Find a ceph client with specific addr and configuration.  If
  * found, bump its reference count.
@@ -499,7 +552,8 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
        spin_lock(&rbd_client_list_lock);
        list_for_each_entry(client_node, &rbd_client_list, node) {
                if (!ceph_compare_options(ceph_opts, client_node->client)) {
-                       kref_get(&client_node->kref);
+                       __rbd_get_client(client_node);
+
                        found = true;
                        break;
                }
@@ -920,6 +974,37 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
        }
 }
 
+/*
+ * similar to zero_bio_chain(), zeros data defined by a page array,
+ * starting at the given byte offset from the start of the array and
+ * continuing up to the given end offset.  The pages array is
+ * assumed to be big enough to hold all bytes up to the end.
+ */
+static void zero_pages(struct page **pages, u64 offset, u64 end)
+{
+       struct page **page = &pages[offset >> PAGE_SHIFT];
+
+       rbd_assert(end > offset);
+       rbd_assert(end - offset <= (u64)SIZE_MAX);
+       while (offset < end) {
+               size_t page_offset;
+               size_t length;
+               unsigned long flags;
+               void *kaddr;
+
+               page_offset = (size_t)(offset & ~PAGE_MASK);
+               length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
+               local_irq_save(flags);
+               kaddr = kmap_atomic(*page);
+               memset(kaddr + page_offset, 0, length);
+               kunmap_atomic(kaddr);
+               local_irq_restore(flags);
+
+               offset += length;
+               page++;
+       }
+}
+
 /*
  * Clone a portion of a bio, starting at the given byte offset
  * and continuing for the number of bytes indicated.
@@ -1064,6 +1149,77 @@ out_err:
        return NULL;
 }
 
+/*
+ * The default/initial value for all object request flags is 0.  For
+ * each flag, once its value is set to 1 it is never reset to 0
+ * again.
+ */
+static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
+{
+       if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
+               struct rbd_device *rbd_dev;
+
+               rbd_dev = obj_request->img_request->rbd_dev;
+               rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
+                       obj_request);
+       }
+}
+
+static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
+{
+       smp_mb();
+       return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
+}
+
+static void obj_request_done_set(struct rbd_obj_request *obj_request)
+{
+       if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
+               struct rbd_device *rbd_dev = NULL;
+
+               if (obj_request_img_data_test(obj_request))
+                       rbd_dev = obj_request->img_request->rbd_dev;
+               rbd_warn(rbd_dev, "obj_request %p already marked done\n",
+                       obj_request);
+       }
+}
+
+static bool obj_request_done_test(struct rbd_obj_request *obj_request)
+{
+       smp_mb();
+       return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
+}
+
+/*
+ * This sets the KNOWN flag after (possibly) setting the EXISTS
+ * flag.  The latter is set based on the "exists" value provided.
+ *
+ * Note that for our purposes once an object exists it never goes
+ * away again.  It's possible that the response from two existence
+ * checks are separated by the creation of the target object, and
+ * the first ("doesn't exist") response arrives *after* the second
+ * ("does exist").  In that case we ignore the second one.
+ */
+static void obj_request_existence_set(struct rbd_obj_request *obj_request,
+                               bool exists)
+{
+       if (exists)
+               set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
+       set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
+       smp_mb();
+}
+
+static bool obj_request_known_test(struct rbd_obj_request *obj_request)
+{
+       smp_mb();
+       return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
+}
+
+static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
+{
+       smp_mb();
+       return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
+}
+
 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
 {
        dout("%s: obj %p (was %d)\n", __func__, obj_request,
@@ -1101,9 +1257,11 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
 {
        rbd_assert(obj_request->img_request == NULL);
 
-       rbd_obj_request_get(obj_request);
+       /* Image request now owns object's original reference */
        obj_request->img_request = img_request;
        obj_request->which = img_request->obj_request_count;
+       rbd_assert(!obj_request_img_data_test(obj_request));
+       obj_request_img_data_set(obj_request);
        rbd_assert(obj_request->which != BAD_WHICH);
        img_request->obj_request_count++;
        list_add_tail(&obj_request->links, &img_request->obj_requests);
@@ -1123,6 +1281,7 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
        img_request->obj_request_count--;
        rbd_assert(obj_request->which == img_request->obj_request_count);
        obj_request->which = BAD_WHICH;
+       rbd_assert(obj_request_img_data_test(obj_request));
        rbd_assert(obj_request->img_request == img_request);
        obj_request->img_request = NULL;
        obj_request->callback = NULL;
@@ -1141,76 +1300,6 @@ static bool obj_request_type_valid(enum obj_request_type type)
        }
 }
 
-static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
-{
-       struct ceph_osd_req_op *op;
-       va_list args;
-       size_t size;
-
-       op = kzalloc(sizeof (*op), GFP_NOIO);
-       if (!op)
-               return NULL;
-       op->op = opcode;
-       va_start(args, opcode);
-       switch (opcode) {
-       case CEPH_OSD_OP_READ:
-       case CEPH_OSD_OP_WRITE:
-               /* rbd_osd_req_op_create(READ, offset, length) */
-               /* rbd_osd_req_op_create(WRITE, offset, length) */
-               op->extent.offset = va_arg(args, u64);
-               op->extent.length = va_arg(args, u64);
-               if (opcode == CEPH_OSD_OP_WRITE)
-                       op->payload_len = op->extent.length;
-               break;
-       case CEPH_OSD_OP_STAT:
-               break;
-       case CEPH_OSD_OP_CALL:
-               /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
-               op->cls.class_name = va_arg(args, char *);
-               size = strlen(op->cls.class_name);
-               rbd_assert(size <= (size_t) U8_MAX);
-               op->cls.class_len = size;
-               op->payload_len = size;
-
-               op->cls.method_name = va_arg(args, char *);
-               size = strlen(op->cls.method_name);
-               rbd_assert(size <= (size_t) U8_MAX);
-               op->cls.method_len = size;
-               op->payload_len += size;
-
-               op->cls.argc = 0;
-               op->cls.indata = va_arg(args, void *);
-               size = va_arg(args, size_t);
-               rbd_assert(size <= (size_t) U32_MAX);
-               op->cls.indata_len = (u32) size;
-               op->payload_len += size;
-               break;
-       case CEPH_OSD_OP_NOTIFY_ACK:
-       case CEPH_OSD_OP_WATCH:
-               /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
-               /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
-               op->watch.cookie = va_arg(args, u64);
-               op->watch.ver = va_arg(args, u64);
-               op->watch.ver = cpu_to_le64(op->watch.ver);
-               if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
-                       op->watch.flag = (u8) 1;
-               break;
-       default:
-               rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
-               kfree(op);
-               op = NULL;
-               break;
-       }
-       va_end(args);
-
-       return op;
-}
-
-static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
-{
-       kfree(op);
-}
-
 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
                                struct rbd_obj_request *obj_request)
 {
@@ -1221,7 +1310,24 @@ static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
 
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
+
        dout("%s: img %p\n", __func__, img_request);
+
+       /*
+        * If no error occurred, compute the aggregate transfer
+        * count for the image request.  We could instead use
+        * atomic64_cmpxchg() to update it as each object request
+        * completes; not clear which way is better off hand.
+        */
+       if (!img_request->result) {
+               struct rbd_obj_request *obj_request;
+               u64 xferred = 0;
+
+               for_each_obj_request(img_request, obj_request)
+                       xferred += obj_request->xferred;
+               img_request->xferred = xferred;
+       }
+
        if (img_request->callback)
                img_request->callback(img_request);
        else
@@ -1237,39 +1343,56 @@ static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
        return wait_for_completion_interruptible(&obj_request->completion);
 }
 
-static void obj_request_done_init(struct rbd_obj_request *obj_request)
+/*
+ * The default/initial value for all image request flags is 0.  Each
+ * is conditionally set to 1 at image request initialization time
+ * and currently never change thereafter.
+ */
+static void img_request_write_set(struct rbd_img_request *img_request)
 {
-       atomic_set(&obj_request->done, 0);
-       smp_wmb();
+       set_bit(IMG_REQ_WRITE, &img_request->flags);
+       smp_mb();
 }
 
-static void obj_request_done_set(struct rbd_obj_request *obj_request)
+static bool img_request_write_test(struct rbd_img_request *img_request)
 {
-       int done;
+       smp_mb();
+       return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
+}
 
-       done = atomic_inc_return(&obj_request->done);
-       if (done > 1) {
-               struct rbd_img_request *img_request = obj_request->img_request;
-               struct rbd_device *rbd_dev;
+static void img_request_child_set(struct rbd_img_request *img_request)
+{
+       set_bit(IMG_REQ_CHILD, &img_request->flags);
+       smp_mb();
+}
 
-               rbd_dev = img_request ? img_request->rbd_dev : NULL;
-               rbd_warn(rbd_dev, "obj_request %p was already done\n",
-                       obj_request);
-       }
+static bool img_request_child_test(struct rbd_img_request *img_request)
+{
+       smp_mb();
+       return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
 }
 
-static bool obj_request_done_test(struct rbd_obj_request *obj_request)
+static void img_request_layered_set(struct rbd_img_request *img_request)
 {
+       set_bit(IMG_REQ_LAYERED, &img_request->flags);
        smp_mb();
-       return atomic_read(&obj_request->done) != 0;
+}
+
+static bool img_request_layered_test(struct rbd_img_request *img_request)
+{
+       smp_mb();
+       return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
 }
 
 static void
 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
 {
+       u64 xferred = obj_request->xferred;
+       u64 length = obj_request->length;
+
        dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
                obj_request, obj_request->img_request, obj_request->result,
-               obj_request->xferred, obj_request->length);
+               xferred, length);
        /*
         * ENOENT means a hole in the image.  We zero-fill the
         * entire length of the request.  A short read also implies
@@ -1277,15 +1400,20 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
         * update the xferred count to indicate the whole request
         * was satisfied.
         */
-       BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
+       rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
        if (obj_request->result == -ENOENT) {
-               zero_bio_chain(obj_request->bio_list, 0);
+               if (obj_request->type == OBJ_REQUEST_BIO)
+                       zero_bio_chain(obj_request->bio_list, 0);
+               else
+                       zero_pages(obj_request->pages, 0, length);
                obj_request->result = 0;
-               obj_request->xferred = obj_request->length;
-       } else if (obj_request->xferred < obj_request->length &&
-                       !obj_request->result) {
-               zero_bio_chain(obj_request->bio_list, obj_request->xferred);
-               obj_request->xferred = obj_request->length;
+               obj_request->xferred = length;
+       } else if (xferred < length && !obj_request->result) {
+               if (obj_request->type == OBJ_REQUEST_BIO)
+                       zero_bio_chain(obj_request->bio_list, xferred);
+               else
+                       zero_pages(obj_request->pages, xferred, length);
+               obj_request->xferred = length;
        }
        obj_request_done_set(obj_request);
 }
@@ -1308,9 +1436,23 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
 
 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 {
-       dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
-               obj_request->result, obj_request->xferred, obj_request->length);
-       if (obj_request->img_request)
+       struct rbd_img_request *img_request = NULL;
+       bool layered = false;
+
+       if (obj_request_img_data_test(obj_request)) {
+               img_request = obj_request->img_request;
+               layered = img_request && img_request_layered_test(img_request);
+       } else {
+               img_request = NULL;
+               layered = false;
+       }
+
+       dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
+               obj_request, img_request, obj_request->result,
+               obj_request->xferred, obj_request->length);
+       if (layered && obj_request->result == -ENOENT)
+               rbd_img_parent_read(obj_request);
+       else if (img_request)
                rbd_img_obj_request_read_callback(obj_request);
        else
                obj_request_done_set(obj_request);
@@ -1321,9 +1463,8 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
        dout("%s: obj %p result %d %llu\n", __func__, obj_request,
                obj_request->result, obj_request->length);
        /*
-        * There is no such thing as a successful short write.
-        * Our xferred value is the number of bytes transferred
-        * back.  Set it to our originally-requested length.
+        * There is no such thing as a successful short write.  Set
+        * it to our originally-requested length.
         */
        obj_request->xferred = obj_request->length;
        obj_request_done_set(obj_request);
@@ -1347,22 +1488,26 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 
        dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
        rbd_assert(osd_req == obj_request->osd_req);
-       rbd_assert(!!obj_request->img_request ^
-                               (obj_request->which == BAD_WHICH));
+       if (obj_request_img_data_test(obj_request)) {
+               rbd_assert(obj_request->img_request);
+               rbd_assert(obj_request->which != BAD_WHICH);
+       } else {
+               rbd_assert(obj_request->which == BAD_WHICH);
+       }
 
        if (osd_req->r_result < 0)
                obj_request->result = osd_req->r_result;
        obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
 
-       WARN_ON(osd_req->r_num_ops != 1);       /* For now */
+       BUG_ON(osd_req->r_num_ops > 2);
 
        /*
         * We support a 64-bit length, but ultimately it has to be
         * passed to blk_end_request(), which takes an unsigned int.
         */
        obj_request->xferred = osd_req->r_reply_op_len[0];
-       rbd_assert(obj_request->xferred < (u64) UINT_MAX);
-       opcode = osd_req->r_request_ops[0].op;
+       rbd_assert(obj_request->xferred < (u64)UINT_MAX);
+       opcode = osd_req->r_ops[0].op;
        switch (opcode) {
        case CEPH_OSD_OP_READ:
                rbd_osd_read_callback(obj_request);
@@ -1388,28 +1533,49 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                rbd_obj_request_complete(obj_request);
 }
 
+static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request = obj_request->img_request;
+       struct ceph_osd_request *osd_req = obj_request->osd_req;
+       u64 snap_id;
+
+       rbd_assert(osd_req != NULL);
+
+       snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
+       ceph_osdc_build_request(osd_req, obj_request->offset,
+                       NULL, snap_id, NULL);
+}
+
+static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request = obj_request->img_request;
+       struct ceph_osd_request *osd_req = obj_request->osd_req;
+       struct ceph_snap_context *snapc;
+       struct timespec mtime = CURRENT_TIME;
+
+       rbd_assert(osd_req != NULL);
+
+       snapc = img_request ? img_request->snapc : NULL;
+       ceph_osdc_build_request(osd_req, obj_request->offset,
+                       snapc, CEPH_NOSNAP, &mtime);
+}
+
 static struct ceph_osd_request *rbd_osd_req_create(
                                        struct rbd_device *rbd_dev,
                                        bool write_request,
-                                       struct rbd_obj_request *obj_request,
-                                       struct ceph_osd_req_op *op)
+                                       struct rbd_obj_request *obj_request)
 {
-       struct rbd_img_request *img_request = obj_request->img_request;
        struct ceph_snap_context *snapc = NULL;
        struct ceph_osd_client *osdc;
        struct ceph_osd_request *osd_req;
-       struct timespec now;
-       struct timespec *mtime;
-       u64 snap_id = CEPH_NOSNAP;
-       u64 offset = obj_request->offset;
-       u64 length = obj_request->length;
 
-       if (img_request) {
-               rbd_assert(img_request->write_request == write_request);
-               if (img_request->write_request)
+       if (obj_request_img_data_test(obj_request)) {
+               struct rbd_img_request *img_request = obj_request->img_request;
+
+               rbd_assert(write_request ==
+                               img_request_write_test(img_request));
+               if (write_request)
                        snapc = img_request->snapc;
-               else
-                       snap_id = img_request->snap_id;
        }
 
        /* Allocate and initialize the request, for the single op */
@@ -1419,31 +1585,10 @@ static struct ceph_osd_request *rbd_osd_req_create(
        if (!osd_req)
                return NULL;    /* ENOMEM */
 
-       rbd_assert(obj_request_type_valid(obj_request->type));
-       switch (obj_request->type) {
-       case OBJ_REQUEST_NODATA:
-               break;          /* Nothing to do */
-       case OBJ_REQUEST_BIO:
-               rbd_assert(obj_request->bio_list != NULL);
-               osd_req->r_bio = obj_request->bio_list;
-               break;
-       case OBJ_REQUEST_PAGES:
-               osd_req->r_pages = obj_request->pages;
-               osd_req->r_num_pages = obj_request->page_count;
-               osd_req->r_page_alignment = offset & ~PAGE_MASK;
-               break;
-       }
-
-       if (write_request) {
+       if (write_request)
                osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
-               now = CURRENT_TIME;
-               mtime = &now;
-       } else {
+       else
                osd_req->r_flags = CEPH_OSD_FLAG_READ;
-               mtime = NULL;   /* not needed for reads */
-               offset = 0;     /* These are not used... */
-               length = 0;     /* ...for osd read requests */
-       }
 
        osd_req->r_callback = rbd_osd_req_callback;
        osd_req->r_priv = obj_request;
@@ -1454,14 +1599,51 @@ static struct ceph_osd_request *rbd_osd_req_create(
 
        osd_req->r_file_layout = rbd_dev->layout;       /* struct */
 
-       /* osd_req will get its own reference to snapc (if non-null) */
+       return osd_req;
+}
+
+/*
+ * Create a copyup osd request based on the information in the
+ * object request supplied.  A copyup request has two osd ops,
+ * a copyup method call, and a "normal" write request.
+ */
+static struct ceph_osd_request *
+rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request;
+       struct ceph_snap_context *snapc;
+       struct rbd_device *rbd_dev;
+       struct ceph_osd_client *osdc;
+       struct ceph_osd_request *osd_req;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+       img_request = obj_request->img_request;
+       rbd_assert(img_request);
+       rbd_assert(img_request_write_test(img_request));
+
+       /* Allocate and initialize the request, for the two ops */
+
+       snapc = img_request->snapc;
+       rbd_dev = img_request->rbd_dev;
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
+       if (!osd_req)
+               return NULL;    /* ENOMEM */
+
+       osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+       osd_req->r_callback = rbd_osd_req_callback;
+       osd_req->r_priv = obj_request;
+
+       osd_req->r_oid_len = strlen(obj_request->object_name);
+       rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
+       memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
 
-       ceph_osdc_build_request(osd_req, offset, length, 1, op,
-                               snapc, snap_id, mtime);
+       osd_req->r_file_layout = rbd_dev->layout;       /* struct */
 
        return osd_req;
 }
 
+
 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
 {
        ceph_osdc_put_request(osd_req);
@@ -1488,10 +1670,10 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
        obj_request->object_name = memcpy(name, object_name, size);
        obj_request->offset = offset;
        obj_request->length = length;
+       obj_request->flags = 0;
        obj_request->which = BAD_WHICH;
        obj_request->type = type;
        INIT_LIST_HEAD(&obj_request->links);
-       obj_request_done_init(obj_request);
        init_completion(&obj_request->completion);
        kref_init(&obj_request->kref);
 
@@ -1541,7 +1723,8 @@ static void rbd_obj_request_destroy(struct kref *kref)
 static struct rbd_img_request *rbd_img_request_create(
                                        struct rbd_device *rbd_dev,
                                        u64 offset, u64 length,
-                                       bool write_request)
+                                       bool write_request,
+                                       bool child_request)
 {
        struct rbd_img_request *img_request;
        struct ceph_snap_context *snapc = NULL;
@@ -1558,189 +1741,633 @@ static struct rbd_img_request *rbd_img_request_create(
                        kfree(img_request);
                        return NULL;    /* Shouldn't happen */
                }
+
        }
 
        img_request->rq = NULL;
        img_request->rbd_dev = rbd_dev;
        img_request->offset = offset;
        img_request->length = length;
-       img_request->write_request = write_request;
-       if (write_request)
+       img_request->flags = 0;
+       if (write_request) {
+               img_request_write_set(img_request);
                img_request->snapc = snapc;
-       else
+       } else {
                img_request->snap_id = rbd_dev->spec->snap_id;
+       }
+       if (child_request)
+               img_request_child_set(img_request);
+       if (rbd_dev->parent_spec)
+               img_request_layered_set(img_request);
        spin_lock_init(&img_request->completion_lock);
        img_request->next_completion = 0;
        img_request->callback = NULL;
+       img_request->result = 0;
        img_request->obj_request_count = 0;
        INIT_LIST_HEAD(&img_request->obj_requests);
        kref_init(&img_request->kref);
 
-       rbd_img_request_get(img_request);       /* Avoid a warning */
-       rbd_img_request_put(img_request);       /* TEMPORARY */
+       rbd_img_request_get(img_request);       /* Avoid a warning */
+       rbd_img_request_put(img_request);       /* TEMPORARY */
+
+       dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
+               write_request ? "write" : "read", offset, length,
+               img_request);
+
+       return img_request;
+}
+
+static void rbd_img_request_destroy(struct kref *kref)
+{
+       struct rbd_img_request *img_request;
+       struct rbd_obj_request *obj_request;
+       struct rbd_obj_request *next_obj_request;
+
+       img_request = container_of(kref, struct rbd_img_request, kref);
+
+       dout("%s: img %p\n", __func__, img_request);
+
+       for_each_obj_request_safe(img_request, obj_request, next_obj_request)
+               rbd_img_obj_request_del(img_request, obj_request);
+       rbd_assert(img_request->obj_request_count == 0);
+
+       if (img_request_write_test(img_request))
+               ceph_put_snap_context(img_request->snapc);
+
+       if (img_request_child_test(img_request))
+               rbd_obj_request_put(img_request->obj_request);
+
+       kfree(img_request);
+}
+
+static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request;
+       unsigned int xferred;
+       int result;
+       bool more;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+       img_request = obj_request->img_request;
+
+       rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
+       xferred = (unsigned int)obj_request->xferred;
+       result = obj_request->result;
+       if (result) {
+               struct rbd_device *rbd_dev = img_request->rbd_dev;
+
+               rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
+                       img_request_write_test(img_request) ? "write" : "read",
+                       obj_request->length, obj_request->img_offset,
+                       obj_request->offset);
+               rbd_warn(rbd_dev, "  result %d xferred %x\n",
+                       result, xferred);
+               if (!img_request->result)
+                       img_request->result = result;
+       }
+
+       /* Image object requests don't own their page array */
+
+       if (obj_request->type == OBJ_REQUEST_PAGES) {
+               obj_request->pages = NULL;
+               obj_request->page_count = 0;
+       }
+
+       if (img_request_child_test(img_request)) {
+               rbd_assert(img_request->obj_request != NULL);
+               more = obj_request->which < img_request->obj_request_count - 1;
+       } else {
+               rbd_assert(img_request->rq != NULL);
+               more = blk_end_request(img_request->rq, result, xferred);
+       }
+
+       return more;
+}
+
+static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request;
+       u32 which = obj_request->which;
+       bool more = true;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+       img_request = obj_request->img_request;
+
+       dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
+       rbd_assert(img_request != NULL);
+       rbd_assert(img_request->obj_request_count > 0);
+       rbd_assert(which != BAD_WHICH);
+       rbd_assert(which < img_request->obj_request_count);
+       rbd_assert(which >= img_request->next_completion);
+
+       spin_lock_irq(&img_request->completion_lock);
+       if (which != img_request->next_completion)
+               goto out;
+
+       for_each_obj_request_from(img_request, obj_request) {
+               rbd_assert(more);
+               rbd_assert(which < img_request->obj_request_count);
+
+               if (!obj_request_done_test(obj_request))
+                       break;
+               more = rbd_img_obj_end_request(obj_request);
+               which++;
+       }
+
+       rbd_assert(more ^ (which == img_request->obj_request_count));
+       img_request->next_completion = which;
+out:
+       spin_unlock_irq(&img_request->completion_lock);
+
+       if (!more)
+               rbd_img_request_complete(img_request);
+}
+
+/*
+ * Split up an image request into one or more object requests, each
+ * to a different object.  The "type" parameter indicates whether
+ * "data_desc" is the pointer to the head of a list of bio
+ * structures, or the base of a page array.  In either case this
+ * function assumes data_desc describes memory sufficient to hold
+ * all data described by the image request.
+ */
+static int rbd_img_request_fill(struct rbd_img_request *img_request,
+                                       enum obj_request_type type,
+                                       void *data_desc)
+{
+       struct rbd_device *rbd_dev = img_request->rbd_dev;
+       struct rbd_obj_request *obj_request = NULL;
+       struct rbd_obj_request *next_obj_request;
+       bool write_request = img_request_write_test(img_request);
+       struct bio *bio_list;
+       unsigned int bio_offset = 0;
+       struct page **pages;
+       u64 img_offset;
+       u64 resid;
+       u16 opcode;
+
+       dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
+               (int)type, data_desc);
+
+       opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
+       img_offset = img_request->offset;
+       resid = img_request->length;
+       rbd_assert(resid > 0);
+
+       if (type == OBJ_REQUEST_BIO) {
+               bio_list = data_desc;
+               rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
+       } else {
+               rbd_assert(type == OBJ_REQUEST_PAGES);
+               pages = data_desc;
+       }
+
+       while (resid) {
+               struct ceph_osd_request *osd_req;
+               const char *object_name;
+               u64 offset;
+               u64 length;
+
+               object_name = rbd_segment_name(rbd_dev, img_offset);
+               if (!object_name)
+                       goto out_unwind;
+               offset = rbd_segment_offset(rbd_dev, img_offset);
+               length = rbd_segment_length(rbd_dev, img_offset, resid);
+               obj_request = rbd_obj_request_create(object_name,
+                                               offset, length, type);
+               kfree(object_name);     /* object request has its own copy */
+               if (!obj_request)
+                       goto out_unwind;
+
+               if (type == OBJ_REQUEST_BIO) {
+                       unsigned int clone_size;
+
+                       rbd_assert(length <= (u64)UINT_MAX);
+                       clone_size = (unsigned int)length;
+                       obj_request->bio_list =
+                                       bio_chain_clone_range(&bio_list,
+                                                               &bio_offset,
+                                                               clone_size,
+                                                               GFP_ATOMIC);
+                       if (!obj_request->bio_list)
+                               goto out_partial;
+               } else {
+                       unsigned int page_count;
+
+                       obj_request->pages = pages;
+                       page_count = (u32)calc_pages_for(offset, length);
+                       obj_request->page_count = page_count;
+                       if ((offset + length) & ~PAGE_MASK)
+                               page_count--;   /* more on last page */
+                       pages += page_count;
+               }
+
+               osd_req = rbd_osd_req_create(rbd_dev, write_request,
+                                               obj_request);
+               if (!osd_req)
+                       goto out_partial;
+               obj_request->osd_req = osd_req;
+               obj_request->callback = rbd_img_obj_callback;
+
+               osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
+                                               0, 0);
+               if (type == OBJ_REQUEST_BIO)
+                       osd_req_op_extent_osd_data_bio(osd_req, 0,
+                                       obj_request->bio_list, length);
+               else
+                       osd_req_op_extent_osd_data_pages(osd_req, 0,
+                                       obj_request->pages, length,
+                                       offset & ~PAGE_MASK, false, false);
+
+               if (write_request)
+                       rbd_osd_req_format_write(obj_request);
+               else
+                       rbd_osd_req_format_read(obj_request);
+
+               obj_request->img_offset = img_offset;
+               rbd_img_obj_request_add(img_request, obj_request);
+
+               img_offset += length;
+               resid -= length;
+       }
+
+       return 0;
+
+out_partial:
+       rbd_obj_request_put(obj_request);
+out_unwind:
+       for_each_obj_request_safe(img_request, obj_request, next_obj_request)
+               rbd_obj_request_put(obj_request);
+
+       return -ENOMEM;
+}
+
+static void
+rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request;
+       struct rbd_device *rbd_dev;
+       u64 length;
+       u32 page_count;
+
+       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_img_data_test(obj_request));
+       img_request = obj_request->img_request;
+       rbd_assert(img_request);
+
+       rbd_dev = img_request->rbd_dev;
+       rbd_assert(rbd_dev);
+       length = (u64)1 << rbd_dev->header.obj_order;
+       page_count = (u32)calc_pages_for(0, length);
+
+       rbd_assert(obj_request->copyup_pages);
+       ceph_release_page_vector(obj_request->copyup_pages, page_count);
+       obj_request->copyup_pages = NULL;
+
+       /*
+        * We want the transfer count to reflect the size of the
+        * original write request.  There is no such thing as a
+        * successful short write, so if the request was successful
+        * we can just set it to the originally-requested length.
+        */
+       if (!obj_request->result)
+               obj_request->xferred = obj_request->length;
+
+       /* Finish up with the normal image object callback */
+
+       rbd_img_obj_callback(obj_request);
+}
+
+static void
+rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
+{
+       struct rbd_obj_request *orig_request;
+       struct ceph_osd_request *osd_req;
+       struct ceph_osd_client *osdc;
+       struct rbd_device *rbd_dev;
+       struct page **pages;
+       int result;
+       u64 obj_size;
+       u64 xferred;
+
+       rbd_assert(img_request_child_test(img_request));
+
+       /* First get what we need from the image request */
+
+       pages = img_request->copyup_pages;
+       rbd_assert(pages != NULL);
+       img_request->copyup_pages = NULL;
+
+       orig_request = img_request->obj_request;
+       rbd_assert(orig_request != NULL);
+       rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
+       result = img_request->result;
+       obj_size = img_request->length;
+       xferred = img_request->xferred;
+
+       rbd_dev = img_request->rbd_dev;
+       rbd_assert(rbd_dev);
+       rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
+
+       rbd_img_request_put(img_request);
+
+       if (result)
+               goto out_err;
+
+       /* Allocate the new copyup osd request for the original request */
 
-       dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
-               write_request ? "write" : "read", offset, length,
-               img_request);
+       result = -ENOMEM;
+       rbd_assert(!orig_request->osd_req);
+       osd_req = rbd_osd_req_create_copyup(orig_request);
+       if (!osd_req)
+               goto out_err;
+       orig_request->osd_req = osd_req;
+       orig_request->copyup_pages = pages;
 
-       return img_request;
-}
+       /* Initialize the copyup op */
 
-static void rbd_img_request_destroy(struct kref *kref)
-{
-       struct rbd_img_request *img_request;
-       struct rbd_obj_request *obj_request;
-       struct rbd_obj_request *next_obj_request;
+       osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
+       osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+                                               false, false);
 
-       img_request = container_of(kref, struct rbd_img_request, kref);
+       /* Then the original write request op */
 
-       dout("%s: img %p\n", __func__, img_request);
+       osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
+                                       orig_request->offset,
+                                       orig_request->length, 0, 0);
+       osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
+                                       orig_request->length);
 
-       for_each_obj_request_safe(img_request, obj_request, next_obj_request)
-               rbd_img_obj_request_del(img_request, obj_request);
-       rbd_assert(img_request->obj_request_count == 0);
+       rbd_osd_req_format_write(orig_request);
 
-       if (img_request->write_request)
-               ceph_put_snap_context(img_request->snapc);
+       /* All set, send it off. */
 
-       kfree(img_request);
+       orig_request->callback = rbd_img_obj_copyup_callback;
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       result = rbd_obj_request_submit(osdc, orig_request);
+       if (!result)
+               return;
+out_err:
+       /* Record the error code and complete the request */
+
+       orig_request->result = result;
+       orig_request->xferred = 0;
+       obj_request_done_set(orig_request);
+       rbd_obj_request_complete(orig_request);
 }
 
-static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
-                                       struct bio *bio_list)
+/*
+ * Read from the parent image the range of data that covers the
+ * entire target of the given object request.  This is used for
+ * satisfying a layered image write request when the target of an
+ * object request from the image request does not exist.
+ *
+ * A page array big enough to hold the returned data is allocated
+ * and supplied to rbd_img_request_fill() as the "data descriptor."
+ * When the read completes, this page array will be transferred to
+ * the original object request for the copyup operation.
+ *
+ * If an error occurs, record it as the result of the original
+ * object request and mark it done so it gets completed.
+ */
+static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
 {
-       struct rbd_device *rbd_dev = img_request->rbd_dev;
-       struct rbd_obj_request *obj_request = NULL;
-       struct rbd_obj_request *next_obj_request;
-       unsigned int bio_offset;
-       u64 image_offset;
-       u64 resid;
-       u16 opcode;
+       struct rbd_img_request *img_request = NULL;
+       struct rbd_img_request *parent_request = NULL;
+       struct rbd_device *rbd_dev;
+       u64 img_offset;
+       u64 length;
+       struct page **pages = NULL;
+       u32 page_count;
+       int result;
 
-       dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
+       rbd_assert(obj_request_img_data_test(obj_request));
+       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
 
-       opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
-                                             : CEPH_OSD_OP_READ;
-       bio_offset = 0;
-       image_offset = img_request->offset;
-       rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
-       resid = img_request->length;
-       rbd_assert(resid > 0);
-       while (resid) {
-               const char *object_name;
-               unsigned int clone_size;
-               struct ceph_osd_req_op *op;
-               u64 offset;
-               u64 length;
+       img_request = obj_request->img_request;
+       rbd_assert(img_request != NULL);
+       rbd_dev = img_request->rbd_dev;
+       rbd_assert(rbd_dev->parent != NULL);
 
-               object_name = rbd_segment_name(rbd_dev, image_offset);
-               if (!object_name)
-                       goto out_unwind;
-               offset = rbd_segment_offset(rbd_dev, image_offset);
-               length = rbd_segment_length(rbd_dev, image_offset, resid);
-               obj_request = rbd_obj_request_create(object_name,
-                                               offset, length,
-                                               OBJ_REQUEST_BIO);
-               kfree(object_name);     /* object request has its own copy */
-               if (!obj_request)
-                       goto out_unwind;
+       /*
+        * First things first.  The original osd request is of no
+        * use to use any more, we'll need a new one that can hold
+        * the two ops in a copyup request.  We'll get that later,
+        * but for now we can release the old one.
+        */
+       rbd_osd_req_destroy(obj_request->osd_req);
+       obj_request->osd_req = NULL;
 
-               rbd_assert(length <= (u64) UINT_MAX);
-               clone_size = (unsigned int) length;
-               obj_request->bio_list = bio_chain_clone_range(&bio_list,
-                                               &bio_offset, clone_size,
-                                               GFP_ATOMIC);
-               if (!obj_request->bio_list)
-                       goto out_partial;
+       /*
+        * Determine the byte range covered by the object in the
+        * child image to which the original request was to be sent.
+        */
+       img_offset = obj_request->img_offset - obj_request->offset;
+       length = (u64)1 << rbd_dev->header.obj_order;
 
-               /*
-                * Build up the op to use in building the osd
-                * request.  Note that the contents of the op are
-                * copied by rbd_osd_req_create().
-                */
-               op = rbd_osd_req_op_create(opcode, offset, length);
-               if (!op)
-                       goto out_partial;
-               obj_request->osd_req = rbd_osd_req_create(rbd_dev,
-                                               img_request->write_request,
-                                               obj_request, op);
-               rbd_osd_req_op_destroy(op);
-               if (!obj_request->osd_req)
-                       goto out_partial;
-               /* status and version are initially zero-filled */
+       /*
+        * Allocate a page array big enough to receive the data read
+        * from the parent.
+        */
+       page_count = (u32)calc_pages_for(0, length);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages)) {
+               result = PTR_ERR(pages);
+               pages = NULL;
+               goto out_err;
+       }
 
-               rbd_img_obj_request_add(img_request, obj_request);
+       result = -ENOMEM;
+       parent_request = rbd_img_request_create(rbd_dev->parent,
+                                               img_offset, length,
+                                               false, true);
+       if (!parent_request)
+               goto out_err;
+       rbd_obj_request_get(obj_request);
+       parent_request->obj_request = obj_request;
 
-               image_offset += length;
-               resid -= length;
-       }
+       result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
+       if (result)
+               goto out_err;
+       parent_request->copyup_pages = pages;
 
-       return 0;
+       parent_request->callback = rbd_img_obj_parent_read_full_callback;
+       result = rbd_img_request_submit(parent_request);
+       if (!result)
+               return 0;
 
-out_partial:
+       parent_request->copyup_pages = NULL;
+       parent_request->obj_request = NULL;
        rbd_obj_request_put(obj_request);
-out_unwind:
-       for_each_obj_request_safe(img_request, obj_request, next_obj_request)
-               rbd_obj_request_put(obj_request);
+out_err:
+       if (pages)
+               ceph_release_page_vector(pages, page_count);
+       if (parent_request)
+               rbd_img_request_put(parent_request);
+       obj_request->result = result;
+       obj_request->xferred = 0;
+       obj_request_done_set(obj_request);
 
-       return -ENOMEM;
+       return result;
 }
 
-static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
+static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
 {
-       struct rbd_img_request *img_request;
-       u32 which = obj_request->which;
-       bool more = true;
+       struct rbd_obj_request *orig_request;
+       int result;
 
-       img_request = obj_request->img_request;
+       rbd_assert(!obj_request_img_data_test(obj_request));
 
-       dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
-       rbd_assert(img_request != NULL);
-       rbd_assert(img_request->rq != NULL);
-       rbd_assert(img_request->obj_request_count > 0);
-       rbd_assert(which != BAD_WHICH);
-       rbd_assert(which < img_request->obj_request_count);
-       rbd_assert(which >= img_request->next_completion);
+       /*
+        * All we need from the object request is the original
+        * request and the result of the STAT op.  Grab those, then
+        * we're done with the request.
+        */
+       orig_request = obj_request->obj_request;
+       obj_request->obj_request = NULL;
+       rbd_assert(orig_request);
+       rbd_assert(orig_request->img_request);
 
-       spin_lock_irq(&img_request->completion_lock);
-       if (which != img_request->next_completion)
+       result = obj_request->result;
+       obj_request->result = 0;
+
+       dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
+               obj_request, orig_request, result,
+               obj_request->xferred, obj_request->length);
+       rbd_obj_request_put(obj_request);
+
+       rbd_assert(orig_request);
+       rbd_assert(orig_request->img_request);
+
+       /*
+        * Our only purpose here is to determine whether the object
+        * exists, and we don't want to treat the non-existence as
+        * an error.  If something else comes back, transfer the
+        * error to the original request and complete it now.
+        */
+       if (!result) {
+               obj_request_existence_set(orig_request, true);
+       } else if (result == -ENOENT) {
+               obj_request_existence_set(orig_request, false);
+       } else if (result) {
+               orig_request->result = result;
                goto out;
+       }
 
-       for_each_obj_request_from(img_request, obj_request) {
-               unsigned int xferred;
-               int result;
+       /*
+        * Resubmit the original request now that we have recorded
+        * whether the target object exists.
+        */
+       orig_request->result = rbd_img_obj_request_submit(orig_request);
+out:
+       if (orig_request->result)
+               rbd_obj_request_complete(orig_request);
+       rbd_obj_request_put(orig_request);
+}
 
-               rbd_assert(more);
-               rbd_assert(which < img_request->obj_request_count);
+static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
+{
+       struct rbd_obj_request *stat_request;
+       struct rbd_device *rbd_dev;
+       struct ceph_osd_client *osdc;
+       struct page **pages = NULL;
+       u32 page_count;
+       size_t size;
+       int ret;
 
-               if (!obj_request_done_test(obj_request))
-                       break;
+       /*
+        * The response data for a STAT call consists of:
+        *     le64 length;
+        *     struct {
+        *         le32 tv_sec;
+        *         le32 tv_nsec;
+        *     } mtime;
+        */
+       size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
+       page_count = (u32)calc_pages_for(0, size);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
 
-               rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
-               xferred = (unsigned int) obj_request->xferred;
-               result = (int) obj_request->result;
-               if (result)
-                       rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
-                               img_request->write_request ? "write" : "read",
-                               result, xferred);
+       ret = -ENOMEM;
+       stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
+                                                       OBJ_REQUEST_PAGES);
+       if (!stat_request)
+               goto out;
 
-               more = blk_end_request(img_request->rq, result, xferred);
-               which++;
-       }
+       rbd_obj_request_get(obj_request);
+       stat_request->obj_request = obj_request;
+       stat_request->pages = pages;
+       stat_request->page_count = page_count;
+
+       rbd_assert(obj_request->img_request);
+       rbd_dev = obj_request->img_request->rbd_dev;
+       stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               stat_request);
+       if (!stat_request->osd_req)
+               goto out;
+       stat_request->callback = rbd_img_obj_exists_callback;
 
-       rbd_assert(more ^ (which == img_request->obj_request_count));
-       img_request->next_completion = which;
+       osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
+       osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
+                                       false, false);
+       rbd_osd_req_format_read(stat_request);
+
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       ret = rbd_obj_request_submit(osdc, stat_request);
 out:
-       spin_unlock_irq(&img_request->completion_lock);
+       if (ret)
+               rbd_obj_request_put(obj_request);
 
-       if (!more)
-               rbd_img_request_complete(img_request);
+       return ret;
+}
+
+static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request;
+       bool known;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+
+       img_request = obj_request->img_request;
+       rbd_assert(img_request);
+
+       /*
+        * Only layered writes need special handling.  If it's not a
+        * layered write, or it is a layered write but we know the
+        * target object exists, it's no different from any other
+        * object request.
+        */
+       if (!img_request_write_test(img_request) ||
+               !img_request_layered_test(img_request) ||
+               ((known = obj_request_known_test(obj_request)) &&
+                       obj_request_exists_test(obj_request))) {
+
+               struct rbd_device *rbd_dev;
+               struct ceph_osd_client *osdc;
+
+               rbd_dev = obj_request->img_request->rbd_dev;
+               osdc = &rbd_dev->rbd_client->client->osdc;
+
+               return rbd_obj_request_submit(osdc, obj_request);
+       }
+
+       /*
+        * It's a layered write.  The target object might exist but
+        * we may not know that yet.  If we know it doesn't exist,
+        * start by reading the data for the full target object from
+        * the parent so we can use it for a copyup to the target.
+        */
+       if (known)
+               return rbd_img_obj_parent_read_full(obj_request);
+
+       /* We don't know whether the target exists.  Go find out. */
+
+       return rbd_img_obj_exists_submit(obj_request);
 }
 
 static int rbd_img_request_submit(struct rbd_img_request *img_request)
 {
-       struct rbd_device *rbd_dev = img_request->rbd_dev;
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
        struct rbd_obj_request *next_obj_request;
 
@@ -1748,27 +2375,78 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request)
        for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
                int ret;
 
-               obj_request->callback = rbd_img_obj_callback;
-               ret = rbd_obj_request_submit(osdc, obj_request);
+               ret = rbd_img_obj_request_submit(obj_request);
                if (ret)
                        return ret;
-               /*
-                * The image request has its own reference to each
-                * of its object requests, so we can safely drop the
-                * initial one here.
-                */
-               rbd_obj_request_put(obj_request);
        }
 
        return 0;
 }
 
+static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
+{
+       struct rbd_obj_request *obj_request;
+
+       rbd_assert(img_request_child_test(img_request));
+
+       obj_request = img_request->obj_request;
+       rbd_assert(obj_request != NULL);
+       obj_request->result = img_request->result;
+       obj_request->xferred = img_request->xferred;
+
+       rbd_img_obj_request_read_callback(obj_request);
+       rbd_obj_request_complete(obj_request);
+}
+
+static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
+{
+       struct rbd_device *rbd_dev;
+       struct rbd_img_request *img_request;
+       int result;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+       rbd_assert(obj_request->img_request != NULL);
+       rbd_assert(obj_request->result == (s32) -ENOENT);
+       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+
+       rbd_dev = obj_request->img_request->rbd_dev;
+       rbd_assert(rbd_dev->parent != NULL);
+       /* rbd_read_finish(obj_request, obj_request->length); */
+       img_request = rbd_img_request_create(rbd_dev->parent,
+                                               obj_request->img_offset,
+                                               obj_request->length,
+                                               false, true);
+       result = -ENOMEM;
+       if (!img_request)
+               goto out_err;
+
+       rbd_obj_request_get(obj_request);
+       img_request->obj_request = obj_request;
+
+       result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+                                       obj_request->bio_list);
+       if (result)
+               goto out_err;
+
+       img_request->callback = rbd_img_parent_read_callback;
+       result = rbd_img_request_submit(img_request);
+       if (result)
+               goto out_err;
+
+       return;
+out_err:
+       if (img_request)
+               rbd_img_request_put(img_request);
+       obj_request->result = result;
+       obj_request->xferred = 0;
+       obj_request_done_set(obj_request);
+}
+
 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
                                   u64 ver, u64 notify_id)
 {
        struct rbd_obj_request *obj_request;
-       struct ceph_osd_req_op *op;
-       struct ceph_osd_client *osdc;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        int ret;
 
        obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
@@ -1777,17 +2455,15 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
                return -ENOMEM;
 
        ret = -ENOMEM;
-       op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
-       if (!op)
-               goto out;
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
-                                               obj_request, op);
-       rbd_osd_req_op_destroy(op);
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
        if (!obj_request->osd_req)
                goto out;
-
-       osdc = &rbd_dev->rbd_client->client->osdc;
        obj_request->callback = rbd_obj_request_put;
+
+       osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
+                                       notify_id, ver, 0);
+       rbd_osd_req_format_read(obj_request);
+
        ret = rbd_obj_request_submit(osdc, obj_request);
 out:
        if (ret)
@@ -1824,7 +2500,6 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
-       struct ceph_osd_req_op *op;
        int ret;
 
        rbd_assert(start ^ !!rbd_dev->watch_event);
@@ -1844,14 +2519,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
        if (!obj_request)
                goto out_cancel;
 
-       op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie,
-                               rbd_dev->header.obj_version, start);
-       if (!op)
-               goto out_cancel;
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
-                                                       obj_request, op);
-       rbd_osd_req_op_destroy(op);
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
        if (!obj_request->osd_req)
                goto out_cancel;
 
@@ -1860,6 +2528,12 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
        else
                ceph_osdc_unregister_linger_request(osdc,
                                        rbd_dev->watch_request->osd_req);
+
+       osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
+                               rbd_dev->watch_event->cookie,
+                               rbd_dev->header.obj_version, start);
+       rbd_osd_req_format_write(obj_request);
+
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
                goto out_cancel;
@@ -1911,20 +2585,18 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
                             size_t inbound_size,
                             u64 *version)
 {
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
-       struct ceph_osd_client *osdc;
-       struct ceph_osd_req_op *op;
        struct page **pages;
        u32 page_count;
        int ret;
 
        /*
-        * Method calls are ultimately read operations but they
-        * don't involve object data (so no offset or length).
-        * The result should placed into the inbound buffer
-        * provided.  They also supply outbound data--parameters for
-        * the object method.  Currently if this is present it will
-        * be a snapshot id.
+        * Method calls are ultimately read operations.  The result
+        * should placed into the inbound buffer provided.  They
+        * also supply outbound data--parameters for the object
+        * method.  Currently if this is present it will be a
+        * snapshot id.
         */
        page_count = (u32) calc_pages_for(0, inbound_size);
        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
@@ -1932,7 +2604,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
                return PTR_ERR(pages);
 
        ret = -ENOMEM;
-       obj_request = rbd_obj_request_create(object_name, 0, 0,
+       obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
                                                        OBJ_REQUEST_PAGES);
        if (!obj_request)
                goto out;
@@ -1940,17 +2612,29 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
        obj_request->pages = pages;
        obj_request->page_count = page_count;
 
-       op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
-                                       method_name, outbound, outbound_size);
-       if (!op)
-               goto out;
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
-                                               obj_request, op);
-       rbd_osd_req_op_destroy(op);
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
        if (!obj_request->osd_req)
                goto out;
 
-       osdc = &rbd_dev->rbd_client->client->osdc;
+       osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
+                                       class_name, method_name);
+       if (outbound_size) {
+               struct ceph_pagelist *pagelist;
+
+               pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+               if (!pagelist)
+                       goto out;
+
+               ceph_pagelist_init(pagelist);
+               ceph_pagelist_append(pagelist, outbound, outbound_size);
+               osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
+                                               pagelist);
+       }
+       osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
+                                       obj_request->pages, inbound_size,
+                                       0, false, false);
+       rbd_osd_req_format_read(obj_request);
+
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
                goto out;
@@ -2039,13 +2723,14 @@ static void rbd_request_fn(struct request_queue *q)
 
                result = -ENOMEM;
                img_request = rbd_img_request_create(rbd_dev, offset, length,
-                                                       write_request);
+                                                       write_request, false);
                if (!img_request)
                        goto end_request;
 
                img_request->rq = rq;
 
-               result = rbd_img_request_fill_bio(img_request, rq->bio);
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+                                               rq->bio);
                if (!result)
                        result = rbd_img_request_submit(img_request);
                if (result)
@@ -2053,8 +2738,10 @@ static void rbd_request_fn(struct request_queue *q)
 end_request:
                spin_lock_irq(q->queue_lock);
                if (result < 0) {
-                       rbd_warn(rbd_dev, "obj_request %s result %d\n",
-                               write_request ? "write" : "read", result);
+                       rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
+                               write_request ? "write" : "read",
+                               length, offset, result);
+
                        __blk_end_request_all(rq, result);
                }
        }
@@ -2126,9 +2813,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
                                char *buf, u64 *version)
 
 {
-       struct ceph_osd_req_op *op;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
-       struct ceph_osd_client *osdc;
        struct page **pages = NULL;
        u32 page_count;
        size_t size;
@@ -2148,16 +2834,19 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        obj_request->pages = pages;
        obj_request->page_count = page_count;
 
-       op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
-       if (!op)
-               goto out;
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
-                                               obj_request, op);
-       rbd_osd_req_op_destroy(op);
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
        if (!obj_request->osd_req)
                goto out;
 
-       osdc = &rbd_dev->rbd_client->client->osdc;
+       osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
+                                       offset, length, 0, 0);
+       osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
+                                       obj_request->pages,
+                                       obj_request->length,
+                                       obj_request->offset & ~PAGE_MASK,
+                                       false, false);
+       rbd_osd_req_format_read(obj_request);
+
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
                goto out;
@@ -2351,6 +3040,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
        else
                ret = rbd_dev_v2_refresh(rbd_dev, hver);
        mutex_unlock(&ctl_mutex);
+       revalidate_disk(rbd_dev->disk);
 
        return ret;
 }
@@ -2694,8 +3384,6 @@ static struct rbd_spec *rbd_spec_alloc(void)
                return NULL;
        kref_init(&spec->kref);
 
-       rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
-
        return spec;
 }
 
@@ -2927,7 +3615,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
                return ret;
 
        incompat = le64_to_cpu(features_buf.incompat);
-       if (incompat & ~RBD_FEATURES_ALL)
+       if (incompat & ~RBD_FEATURES_SUPPORTED)
                return -ENXIO;
 
        *snap_features = le64_to_cpu(features_buf.features);
@@ -3790,6 +4478,11 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
        void *response;
        void *p;
 
+       /* If we already have it we don't need to look it up */
+
+       if (rbd_dev->spec->image_id)
+               return 0;
+
        /*
         * When probing a parent image, the image id is already
         * known (and the image name likely is not).  There's no
@@ -3967,6 +4660,9 @@ out_err:
 
 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
 {
+       struct rbd_device *parent = NULL;
+       struct rbd_spec *parent_spec = NULL;
+       struct rbd_client *rbdc = NULL;
        int ret;
 
        /* no need to lock here, as rbd_dev is not registered yet */
@@ -4011,6 +4707,31 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
         * At this point cleanup in the event of an error is the job
         * of the sysfs code (initiated by rbd_bus_del_dev()).
         */
+       /* Probe the parent if there is one */
+
+       if (rbd_dev->parent_spec) {
+               /*
+                * We need to pass a reference to the client and the
+                * parent spec when creating the parent rbd_dev.
+                * Images related by parent/child relationships
+                * always share both.
+                */
+               parent_spec = rbd_spec_get(rbd_dev->parent_spec);
+               rbdc = __rbd_get_client(rbd_dev->rbd_client);
+
+               parent = rbd_dev_create(rbdc, parent_spec);
+               if (!parent) {
+                       ret = -ENOMEM;
+                       goto err_out_spec;
+               }
+               rbdc = NULL;            /* parent now owns reference */
+               parent_spec = NULL;     /* parent now owns reference */
+               ret = rbd_dev_probe(parent);
+               if (ret < 0)
+                       goto err_out_parent;
+               rbd_dev->parent = parent;
+       }
+
        down_write(&rbd_dev->header_rwsem);
        ret = rbd_dev_snaps_register(rbd_dev);
        up_write(&rbd_dev->header_rwsem);
@@ -4029,6 +4750,12 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
                (unsigned long long) rbd_dev->mapping.size);
 
        return ret;
+
+err_out_parent:
+       rbd_dev_destroy(parent);
+err_out_spec:
+       rbd_spec_put(parent_spec);
+       rbd_put_client(rbdc);
 err_out_bus:
        /* this will also clean up rest of rbd_dev stuff */
 
@@ -4192,6 +4919,12 @@ static void rbd_dev_release(struct device *dev)
        module_put(THIS_MODULE);
 }
 
+static void __rbd_remove(struct rbd_device *rbd_dev)
+{
+       rbd_remove_all_snaps(rbd_dev);
+       rbd_bus_del_dev(rbd_dev);
+}
+
 static ssize_t rbd_remove(struct bus_type *bus,
                          const char *buf,
                          size_t count)
@@ -4227,8 +4960,26 @@ static ssize_t rbd_remove(struct bus_type *bus,
        if (ret < 0)
                goto done;
 
-       rbd_remove_all_snaps(rbd_dev);
-       rbd_bus_del_dev(rbd_dev);
+       while (rbd_dev->parent_spec) {
+               struct rbd_device *first = rbd_dev;
+               struct rbd_device *second = first->parent;
+               struct rbd_device *third;
+
+               /*
+                * Follow to the parent with no grandparent and
+                * remove it.
+                */
+               while (second && (third = second->parent)) {
+                       first = second;
+                       second = third;
+               }
+               __rbd_remove(second);
+               rbd_spec_put(first->parent_spec);
+               first->parent_spec = NULL;
+               first->parent_overlap = 0;
+               first->parent = NULL;
+       }
+       __rbd_remove(rbd_dev);
 
 done:
        mutex_unlock(&ctl_mutex);