]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - drivers/block/rbd.c
Merge tag 'ceph-for-4.11-rc1' of git://github.com/ceph/ceph-client
[karo-tx-linux.git] / drivers / block / rbd.c
index 362cecc77130260459d81d18d8853f39a7eb35eb..4d680772379828423d8605b1cae8c5da271ec5b8 100644 (file)
@@ -123,9 +123,11 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define RBD_FEATURE_LAYERING   (1<<0)
 #define RBD_FEATURE_STRIPINGV2 (1<<1)
 #define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
+#define RBD_FEATURE_DATA_POOL (1<<7)
 #define RBD_FEATURES_ALL       (RBD_FEATURE_LAYERING |         \
                                 RBD_FEATURE_STRIPINGV2 |       \
-                                RBD_FEATURE_EXCLUSIVE_LOCK)
+                                RBD_FEATURE_EXCLUSIVE_LOCK |   \
+                                RBD_FEATURE_DATA_POOL)
 
 /* Features supported by this (client software) implementation. */
 
@@ -144,10 +146,9 @@ struct rbd_image_header {
        /* These six fields never change for a given rbd image */
        char *object_prefix;
        __u8 obj_order;
-       __u8 crypt_type;
-       __u8 comp_type;
        u64 stripe_unit;
        u64 stripe_count;
+       s64 data_pool_id;
        u64 features;           /* Might be changeable someday? */
 
        /* The remaining fields need to be updated occasionally */
@@ -230,7 +231,7 @@ enum obj_req_flags {
 };
 
 struct rbd_obj_request {
-       const char              *object_name;
+       u64                     object_no;
        u64                     offset;         /* object start byte */
        u64                     length;         /* bytes from offset */
        unsigned long           flags;
@@ -438,7 +439,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock);
 
 static struct kmem_cache       *rbd_img_request_cache;
 static struct kmem_cache       *rbd_obj_request_cache;
-static struct kmem_cache       *rbd_segment_name_cache;
 
 static int rbd_major;
 static DEFINE_IDA(rbd_dev_id_ida);
@@ -972,6 +972,30 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
        return true;
 }
 
+/*
+ * returns the size of an object in the image
+ */
+static u32 rbd_obj_bytes(struct rbd_image_header *header)
+{
+       return 1U << header->obj_order;
+}
+
+static void rbd_init_layout(struct rbd_device *rbd_dev)
+{
+       if (rbd_dev->header.stripe_unit == 0 ||
+           rbd_dev->header.stripe_count == 0) {
+               rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
+               rbd_dev->header.stripe_count = 1;
+       }
+
+       rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
+       rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
+       rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
+       rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
+                         rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
+       RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
+}
+
 /*
  * Fill an rbd image header with information from the given format 1
  * on-disk header.
@@ -992,15 +1016,11 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
        /* Allocate this now to avoid having to handle failure below */
 
        if (first_time) {
-               size_t len;
-
-               len = strnlen(ondisk->object_prefix,
-                               sizeof (ondisk->object_prefix));
-               object_prefix = kmalloc(len + 1, GFP_KERNEL);
+               object_prefix = kstrndup(ondisk->object_prefix,
+                                        sizeof(ondisk->object_prefix),
+                                        GFP_KERNEL);
                if (!object_prefix)
                        return -ENOMEM;
-               memcpy(object_prefix, ondisk->object_prefix, len);
-               object_prefix[len] = '\0';
        }
 
        /* Allocate the snapshot context and fill it in */
@@ -1051,12 +1071,7 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
        if (first_time) {
                header->object_prefix = object_prefix;
                header->obj_order = ondisk->options.order;
-               header->crypt_type = ondisk->options.crypt_type;
-               header->comp_type = ondisk->options.comp_type;
-               /* The rest aren't used for format 1 images */
-               header->stripe_unit = 0;
-               header->stripe_count = 0;
-               header->features = 0;
+               rbd_init_layout(rbd_dev);
        } else {
                ceph_put_snap_context(header->snapc);
                kfree(header->snap_names);
@@ -1232,42 +1247,9 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
        rbd_dev->mapping.features = 0;
 }
 
-static void rbd_segment_name_free(const char *name)
-{
-       /* The explicit cast here is needed to drop the const qualifier */
-
-       kmem_cache_free(rbd_segment_name_cache, (void *)name);
-}
-
-static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
-{
-       char *name;
-       u64 segment;
-       int ret;
-       char *name_format;
-
-       name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
-       if (!name)
-               return NULL;
-       segment = offset >> rbd_dev->header.obj_order;
-       name_format = "%s.%012llx";
-       if (rbd_dev->image_format == 2)
-               name_format = "%s.%016llx";
-       ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
-                       rbd_dev->header.object_prefix, segment);
-       if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
-               pr_err("error formatting segment name for #%llu (%d)\n",
-                       segment, ret);
-               rbd_segment_name_free(name);
-               name = NULL;
-       }
-
-       return name;
-}
-
 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
 {
-       u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
+       u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
 
        return offset & (segment_size - 1);
 }
@@ -1275,7 +1257,7 @@ static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
                                u64 offset, u64 length)
 {
-       u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
+       u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
 
        offset &= segment_size - 1;
 
@@ -1286,14 +1268,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
        return length;
 }
 
-/*
- * returns the size of an object in the image
- */
-static u64 rbd_obj_bytes(struct rbd_image_header *header)
-{
-       return 1 << header->obj_order;
-}
-
 /*
  * bio helpers
  */
@@ -1623,7 +1597,9 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
 {
        struct ceph_osd_request *osd_req = obj_request->osd_req;
 
-       dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
+       dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
+            obj_request, obj_request->object_no, obj_request->offset,
+            obj_request->length, osd_req);
        if (obj_request_img_data_test(obj_request)) {
                WARN_ON(obj_request->callback != rbd_img_obj_callback);
                rbd_img_request_get(obj_request->img_request);
@@ -1631,44 +1607,6 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
        ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
 }
 
-static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
-{
-       dout("%s %p\n", __func__, obj_request);
-       ceph_osdc_cancel_request(obj_request->osd_req);
-}
-
-/*
- * Wait for an object request to complete.  If interrupted, cancel the
- * underlying osd request.
- *
- * @timeout: in jiffies, 0 means "wait forever"
- */
-static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
-                                 unsigned long timeout)
-{
-       long ret;
-
-       dout("%s %p\n", __func__, obj_request);
-       ret = wait_for_completion_interruptible_timeout(
-                                       &obj_request->completion,
-                                       ceph_timeout_jiffies(timeout));
-       if (ret <= 0) {
-               if (ret == 0)
-                       ret = -ETIMEDOUT;
-               rbd_obj_request_end(obj_request);
-       } else {
-               ret = 0;
-       }
-
-       dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
-       return ret;
-}
-
-static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
-{
-       return __rbd_obj_request_wait(obj_request, 0);
-}
-
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
 
@@ -1955,8 +1893,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
                rbd_osd_call_callback(obj_request);
                break;
        default:
-               rbd_warn(NULL, "%s: unsupported op %hu",
-                       obj_request->object_name, (unsigned short) opcode);
+               rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
+                        obj_request->object_no, opcode);
                break;
        }
 
@@ -1980,6 +1918,40 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
        osd_req->r_data_offset = obj_request->offset;
 }
 
+static struct ceph_osd_request *
+__rbd_osd_req_create(struct rbd_device *rbd_dev,
+                    struct ceph_snap_context *snapc,
+                    int num_ops, unsigned int flags,
+                    struct rbd_obj_request *obj_request)
+{
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct ceph_osd_request *req;
+       const char *name_format = rbd_dev->image_format == 1 ?
+                                     RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
+
+       req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
+       if (!req)
+               return NULL;
+
+       req->r_flags = flags;
+       req->r_callback = rbd_osd_req_callback;
+       req->r_priv = obj_request;
+
+       req->r_base_oloc.pool = rbd_dev->layout.pool_id;
+       if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
+                       rbd_dev->header.object_prefix, obj_request->object_no))
+               goto err_req;
+
+       if (ceph_osdc_alloc_messages(req, GFP_NOIO))
+               goto err_req;
+
+       return req;
+
+err_req:
+       ceph_osdc_put_request(req);
+       return NULL;
+}
+
 /*
  * Create an osd request.  A read request has one osd op (read).
  * A write request has either one (watch) or two (hint+write) osd ops.
@@ -1993,8 +1965,6 @@ static struct ceph_osd_request *rbd_osd_req_create(
                                        struct rbd_obj_request *obj_request)
 {
        struct ceph_snap_context *snapc = NULL;
-       struct ceph_osd_client *osdc;
-       struct ceph_osd_request *osd_req;
 
        if (obj_request_img_data_test(obj_request) &&
                (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
@@ -2009,35 +1979,9 @@ static struct ceph_osd_request *rbd_osd_req_create(
 
        rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
 
-       /* Allocate and initialize the request, for the num_ops ops */
-
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
-                                         GFP_NOIO);
-       if (!osd_req)
-               goto fail;
-
-       if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
-               osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
-       else
-               osd_req->r_flags = CEPH_OSD_FLAG_READ;
-
-       osd_req->r_callback = rbd_osd_req_callback;
-       osd_req->r_priv = obj_request;
-
-       osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
-       if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
-                            obj_request->object_name))
-               goto fail;
-
-       if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
-               goto fail;
-
-       return osd_req;
-
-fail:
-       ceph_osdc_put_request(osd_req);
-       return NULL;
+       return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
+           (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
+           CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
 }
 
 /*
@@ -2050,10 +1994,6 @@ static struct ceph_osd_request *
 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request;
-       struct ceph_snap_context *snapc;
-       struct rbd_device *rbd_dev;
-       struct ceph_osd_client *osdc;
-       struct ceph_osd_request *osd_req;
        int num_osd_ops = 3;
 
        rbd_assert(obj_request_img_data_test(obj_request));
@@ -2065,77 +2005,34 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
        if (img_request_discard_test(img_request))
                num_osd_ops = 2;
 
-       /* Allocate and initialize the request, for all the ops */
-
-       snapc = img_request->snapc;
-       rbd_dev = img_request->rbd_dev;
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
-                                               false, GFP_NOIO);
-       if (!osd_req)
-               goto fail;
-
-       osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
-       osd_req->r_callback = rbd_osd_req_callback;
-       osd_req->r_priv = obj_request;
-
-       osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
-       if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
-                            obj_request->object_name))
-               goto fail;
-
-       if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
-               goto fail;
-
-       return osd_req;
-
-fail:
-       ceph_osdc_put_request(osd_req);
-       return NULL;
+       return __rbd_osd_req_create(img_request->rbd_dev,
+                                   img_request->snapc, num_osd_ops,
+                                   CEPH_OSD_FLAG_WRITE, obj_request);
 }
 
-
 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
 {
        ceph_osdc_put_request(osd_req);
 }
 
-/* object_name is assumed to be a non-null pointer and NUL-terminated */
-
-static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
-                                               u64 offset, u64 length,
-                                               enum obj_request_type type)
+static struct rbd_obj_request *
+rbd_obj_request_create(enum obj_request_type type)
 {
        struct rbd_obj_request *obj_request;
-       size_t size;
-       char *name;
 
        rbd_assert(obj_request_type_valid(type));
 
-       size = strlen(object_name) + 1;
-       name = kmalloc(size, GFP_NOIO);
-       if (!name)
-               return NULL;
-
        obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
-       if (!obj_request) {
-               kfree(name);
+       if (!obj_request)
                return NULL;
-       }
 
-       obj_request->object_name = memcpy(name, object_name, size);
-       obj_request->offset = offset;
-       obj_request->length = length;
-       obj_request->flags = 0;
        obj_request->which = BAD_WHICH;
        obj_request->type = type;
        INIT_LIST_HEAD(&obj_request->links);
        init_completion(&obj_request->completion);
        kref_init(&obj_request->kref);
 
-       dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
-               offset, length, (int)type, obj_request);
-
+       dout("%s %p\n", __func__, obj_request);
        return obj_request;
 }
 
@@ -2170,8 +2067,6 @@ static void rbd_obj_request_destroy(struct kref *kref)
                break;
        }
 
-       kfree(obj_request->object_name);
-       obj_request->object_name = NULL;
        kmem_cache_free(rbd_obj_request_cache, obj_request);
 }
 
@@ -2546,22 +2441,18 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
 
        while (resid) {
                struct ceph_osd_request *osd_req;
-               const char *object_name;
-               u64 offset;
-               u64 length;
+               u64 object_no = img_offset >> rbd_dev->header.obj_order;
+               u64 offset = rbd_segment_offset(rbd_dev, img_offset);
+               u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
 
-               object_name = rbd_segment_name(rbd_dev, img_offset);
-               if (!object_name)
-                       goto out_unwind;
-               offset = rbd_segment_offset(rbd_dev, img_offset);
-               length = rbd_segment_length(rbd_dev, img_offset, resid);
-               obj_request = rbd_obj_request_create(object_name,
-                                               offset, length, type);
-               /* object request has its own copy of the object name */
-               rbd_segment_name_free(object_name);
+               obj_request = rbd_obj_request_create(type);
                if (!obj_request)
                        goto out_unwind;
 
+               obj_request->object_no = object_no;
+               obj_request->offset = offset;
+               obj_request->length = length;
+
                /*
                 * set obj_request->img_request before creating the
                 * osd_request so that it gets the right snapc
@@ -2771,7 +2662,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
         * child image to which the original request was to be sent.
         */
        img_offset = obj_request->img_offset - obj_request->offset;
-       length = (u64)1 << rbd_dev->header.obj_order;
+       length = rbd_obj_bytes(&rbd_dev->header);
 
        /*
         * There is no defined parent data beyond the parent
@@ -2900,11 +2791,12 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
        size_t size;
        int ret;
 
-       stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
-                                             OBJ_REQUEST_PAGES);
+       stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
        if (!stat_request)
                return -ENOMEM;
 
+       stat_request->object_no = obj_request->object_no;
+
        stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
                                                   stat_request);
        if (!stat_request->osd_req) {
@@ -3983,17 +3875,17 @@ out:
  * returned in the outbound buffer, or a negative error code.
  */
 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
-                            const char *object_name,
-                            const char *class_name,
+                            struct ceph_object_id *oid,
+                            struct ceph_object_locator *oloc,
                             const char *method_name,
                             const void *outbound,
                             size_t outbound_size,
                             void *inbound,
                             size_t inbound_size)
 {
-       struct rbd_obj_request *obj_request;
-       struct page **pages;
-       u32 page_count;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct page *req_page = NULL;
+       struct page *reply_page;
        int ret;
 
        /*
@@ -4003,61 +3895,35 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
         * method.  Currently if this is present it will be a
         * snapshot id.
         */
-       page_count = (u32)calc_pages_for(0, inbound_size);
-       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
-
-       ret = -ENOMEM;
-       obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
-                                                       OBJ_REQUEST_PAGES);
-       if (!obj_request)
-               goto out;
+       if (outbound) {
+               if (outbound_size > PAGE_SIZE)
+                       return -E2BIG;
 
-       obj_request->pages = pages;
-       obj_request->page_count = page_count;
-
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-                                                 obj_request);
-       if (!obj_request->osd_req)
-               goto out;
-
-       osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
-                                       class_name, method_name);
-       if (outbound_size) {
-               struct ceph_pagelist *pagelist;
-
-               pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
-               if (!pagelist)
-                       goto out;
+               req_page = alloc_page(GFP_KERNEL);
+               if (!req_page)
+                       return -ENOMEM;
 
-               ceph_pagelist_init(pagelist);
-               ceph_pagelist_append(pagelist, outbound, outbound_size);
-               osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
-                                               pagelist);
+               memcpy(page_address(req_page), outbound, outbound_size);
        }
-       osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
-                                       obj_request->pages, inbound_size,
-                                       0, false, false);
-
-       rbd_obj_request_submit(obj_request);
-       ret = rbd_obj_request_wait(obj_request);
-       if (ret)
-               goto out;
 
-       ret = obj_request->result;
-       if (ret < 0)
-               goto out;
+       reply_page = alloc_page(GFP_KERNEL);
+       if (!reply_page) {
+               if (req_page)
+                       __free_page(req_page);
+               return -ENOMEM;
+       }
 
-       rbd_assert(obj_request->xferred < (u64)INT_MAX);
-       ret = (int)obj_request->xferred;
-       ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
-out:
-       if (obj_request)
-               rbd_obj_request_put(obj_request);
-       else
-               ceph_release_page_vector(pages, page_count);
+       ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
+                            CEPH_OSD_FLAG_READ, req_page, outbound_size,
+                            reply_page, &inbound_size);
+       if (!ret) {
+               memcpy(inbound, page_address(reply_page), inbound_size);
+               ret = inbound_size;
+       }
 
+       if (req_page)
+               __free_page(req_page);
+       __free_page(reply_page);
        return ret;
 }
 
@@ -4256,63 +4122,46 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
 }
 
 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
-                               const char *object_name,
-                               u64 offset, u64 length, void *buf)
+                            struct ceph_object_id *oid,
+                            struct ceph_object_locator *oloc,
+                            void *buf, int buf_len)
 
 {
-       struct rbd_obj_request *obj_request;
-       struct page **pages = NULL;
-       u32 page_count;
-       size_t size;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct ceph_osd_request *req;
+       struct page **pages;
+       int num_pages = calc_pages_for(0, buf_len);
        int ret;
 
-       page_count = (u32) calc_pages_for(offset, length);
-       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
-
-       ret = -ENOMEM;
-       obj_request = rbd_obj_request_create(object_name, offset, length,
-                                                       OBJ_REQUEST_PAGES);
-       if (!obj_request)
-               goto out;
-
-       obj_request->pages = pages;
-       obj_request->page_count = page_count;
-
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-                                                 obj_request);
-       if (!obj_request->osd_req)
-               goto out;
+       req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+       if (!req)
+               return -ENOMEM;
 
-       osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
-                                       offset, length, 0, 0);
-       osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
-                                       obj_request->pages,
-                                       obj_request->length,
-                                       obj_request->offset & ~PAGE_MASK,
-                                       false, false);
+       ceph_oid_copy(&req->r_base_oid, oid);
+       ceph_oloc_copy(&req->r_base_oloc, oloc);
+       req->r_flags = CEPH_OSD_FLAG_READ;
 
-       rbd_obj_request_submit(obj_request);
-       ret = rbd_obj_request_wait(obj_request);
+       ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
        if (ret)
-               goto out;
+               goto out_req;
 
-       ret = obj_request->result;
-       if (ret < 0)
-               goto out;
+       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+       if (IS_ERR(pages)) {
+               ret = PTR_ERR(pages);
+               goto out_req;
+       }
 
-       rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
-       size = (size_t) obj_request->xferred;
-       ceph_copy_from_page_vector(pages, buf, 0, size);
-       rbd_assert(size <= (size_t)INT_MAX);
-       ret = (int)size;
-out:
-       if (obj_request)
-               rbd_obj_request_put(obj_request);
-       else
-               ceph_release_page_vector(pages, page_count);
+       osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
+       osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
+                                        true);
+
+       ceph_osdc_start_request(osdc, req, false);
+       ret = ceph_osdc_wait_request(osdc, req);
+       if (ret >= 0)
+               ceph_copy_from_page_vector(pages, buf, 0, ret);
 
+out_req:
+       ceph_osdc_put_request(req);
        return ret;
 }
 
@@ -4348,8 +4197,8 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
                if (!ondisk)
                        return -ENOMEM;
 
-               ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
-                                      0, size, ondisk);
+               ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
+                                       &rbd_dev->header_oloc, ondisk, size);
                if (ret < 0)
                        goto out;
                if ((size_t)ret < size) {
@@ -4781,7 +4630,7 @@ static const struct attribute_group *rbd_attr_groups[] = {
 
 static void rbd_dev_release(struct device *dev);
 
-static struct device_type rbd_device_type = {
+static const struct device_type rbd_device_type = {
        .name           = "rbd",
        .groups         = rbd_attr_groups,
        .release        = rbd_dev_release,
@@ -4876,8 +4725,9 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
        INIT_LIST_HEAD(&rbd_dev->node);
        init_rwsem(&rbd_dev->header_rwsem);
 
+       rbd_dev->header.data_pool_id = CEPH_NOPOOL;
        ceph_oid_init(&rbd_dev->header_oid);
-       ceph_oloc_init(&rbd_dev->header_oloc);
+       rbd_dev->header_oloc.pool = spec->pool_id;
 
        mutex_init(&rbd_dev->watch_mutex);
        rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
@@ -4899,12 +4749,6 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
        rbd_dev->rbd_client = rbdc;
        rbd_dev->spec = spec;
 
-       rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
-       rbd_dev->layout.stripe_count = 1;
-       rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
-       rbd_dev->layout.pool_id = spec->pool_id;
-       RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
-
        return rbd_dev;
 }
 
@@ -4970,10 +4814,10 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
                __le64 size;
        } __attribute__ ((packed)) size_buf = { 0 };
 
-       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
-                               "rbd", "get_size",
-                               &snapid, sizeof (snapid),
-                               &size_buf, sizeof (size_buf));
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, "get_size",
+                                 &snapid, sizeof(snapid),
+                                 &size_buf, sizeof(size_buf));
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
@@ -5010,9 +4854,9 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
        if (!reply_buf)
                return -ENOMEM;
 
-       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
-                               "rbd", "get_object_prefix", NULL, 0,
-                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, "get_object_prefix",
+                                 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
@@ -5045,10 +4889,10 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
        u64 unsup;
        int ret;
 
-       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
-                               "rbd", "get_features",
-                               &snapid, sizeof (snapid),
-                               &features_buf, sizeof (features_buf));
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, "get_features",
+                                 &snapid, sizeof(snapid),
+                                 &features_buf, sizeof(features_buf));
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
@@ -5107,10 +4951,9 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        }
 
        snapid = cpu_to_le64(rbd_dev->spec->snap_id);
-       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
-                               "rbd", "get_parent",
-                               &snapid, sizeof (snapid),
-                               reply_buf, size);
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, "get_parent",
+                                 &snapid, sizeof(snapid), reply_buf, size);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out_err;
@@ -5210,9 +5053,9 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
        u64 stripe_count;
        int ret;
 
-       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
-                               "rbd", "get_stripe_unit_count", NULL, 0,
-                               (char *)&striping_info_buf, size);
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                               &rbd_dev->header_oloc, "get_stripe_unit_count",
+                               NULL, 0, &striping_info_buf, size);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
@@ -5226,7 +5069,7 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
         * out, and only fail if the image has non-default values.
         */
        ret = -EINVAL;
-       obj_size = (u64)1 << rbd_dev->header.obj_order;
+       obj_size = rbd_obj_bytes(&rbd_dev->header);
        p = &striping_info_buf;
        stripe_unit = ceph_decode_64(&p);
        if (stripe_unit != obj_size) {
@@ -5247,8 +5090,27 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
        return 0;
 }
 
+static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
+{
+       __le64 data_pool_id;
+       int ret;
+
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, "get_data_pool",
+                                 NULL, 0, &data_pool_id, sizeof(data_pool_id));
+       if (ret < 0)
+               return ret;
+       if (ret < sizeof(data_pool_id))
+               return -EBADMSG;
+
+       rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
+       WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
+       return 0;
+}
+
 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
 {
+       CEPH_DEFINE_OID_ONSTACK(oid);
        size_t image_id_size;
        char *image_id;
        void *p;
@@ -5276,10 +5138,10 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
        if (!reply_buf)
                goto out;
 
-       ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
-                               "rbd", "dir_get_name",
-                               image_id, image_id_size,
-                               reply_buf, size);
+       ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
+       ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
+                                 "dir_get_name", image_id, image_id_size,
+                                 reply_buf, size);
        if (ret < 0)
                goto out;
        p = reply_buf;
@@ -5458,9 +5320,9 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
        if (!reply_buf)
                return -ENOMEM;
 
-       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
-                               "rbd", "get_snapcontext", NULL, 0,
-                               reply_buf, size);
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, "get_snapcontext",
+                                 NULL, 0, reply_buf, size);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
@@ -5523,10 +5385,9 @@ static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
                return ERR_PTR(-ENOMEM);
 
        snapid = cpu_to_le64(snap_id);
-       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
-                               "rbd", "get_snapshot_name",
-                               &snapid, sizeof (snapid),
-                               reply_buf, size);
+       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, "get_snapshot_name",
+                                 &snapid, sizeof(snapid), reply_buf, size);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0) {
                snap_name = ERR_PTR(ret);
@@ -5833,7 +5694,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 {
        int ret;
        size_t size;
-       char *object_name;
+       CEPH_DEFINE_OID_ONSTACK(oid);
        void *response;
        char *image_id;
 
@@ -5853,12 +5714,12 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
         * First, see if the format 2 image id file exists, and if
         * so, get the image's persistent id from it.
         */
-       size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
-       object_name = kmalloc(size, GFP_NOIO);
-       if (!object_name)
-               return -ENOMEM;
-       sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
-       dout("rbd id object name is %s\n", object_name);
+       ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
+                              rbd_dev->spec->image_name);
+       if (ret)
+               return ret;
+
+       dout("rbd id object name is %s\n", oid.name);
 
        /* Response will be an encoded string, which includes a length */
 
@@ -5871,9 +5732,9 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 
        /* If it doesn't exist we'll assume it's a format 1 image */
 
-       ret = rbd_obj_method_sync(rbd_dev, object_name,
-                               "rbd", "get_id", NULL, 0,
-                               response, RBD_IMAGE_ID_LEN_MAX);
+       ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
+                                 "get_id", NULL, 0,
+                                 response, RBD_IMAGE_ID_LEN_MAX);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret == -ENOENT) {
                image_id = kstrdup("", GFP_KERNEL);
@@ -5896,8 +5757,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
        }
 out:
        kfree(response);
-       kfree(object_name);
-
+       ceph_oid_destroy(&oid);
        return ret;
 }
 
@@ -5944,14 +5804,20 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
                if (ret < 0)
                        goto out_err;
        }
-       /* No support for crypto and compression type format 2 images */
 
+       if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
+               ret = rbd_dev_v2_data_pool(rbd_dev);
+               if (ret)
+                       goto out_err;
+       }
+
+       rbd_init_layout(rbd_dev);
        return 0;
+
 out_err:
        rbd_dev->header.features = 0;
        kfree(rbd_dev->header.object_prefix);
        rbd_dev->header.object_prefix = NULL;
-
        return ret;
 }
 
@@ -6077,8 +5943,6 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
        /* Record the header object name for this rbd image. */
 
        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
-
-       rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
        if (rbd_dev->image_format == 1)
                ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
                                       spec->image_name, RBD_SUFFIX);
@@ -6471,27 +6335,16 @@ static int rbd_slab_init(void)
        if (!rbd_obj_request_cache)
                goto out_err;
 
-       rbd_assert(!rbd_segment_name_cache);
-       rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
-                                       CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
-       if (rbd_segment_name_cache)
-               return 0;
-out_err:
-       kmem_cache_destroy(rbd_obj_request_cache);
-       rbd_obj_request_cache = NULL;
+       return 0;
 
+out_err:
        kmem_cache_destroy(rbd_img_request_cache);
        rbd_img_request_cache = NULL;
-
        return -ENOMEM;
 }
 
 static void rbd_slab_exit(void)
 {
-       rbd_assert(rbd_segment_name_cache);
-       kmem_cache_destroy(rbd_segment_name_cache);
-       rbd_segment_name_cache = NULL;
-
        rbd_assert(rbd_obj_request_cache);
        kmem_cache_destroy(rbd_obj_request_cache);
        rbd_obj_request_cache = NULL;