X-Git-Url: https://git.karo-electronics.de/?a=blobdiff_plain;f=drivers%2Fblock%2Frbd.c;h=f556f8a8b3f9b476949c6133f39778c49502e380;hb=0eefd470f034cc18349fa1a9e4fda000e963c4e3;hp=6c81a4c040b987e75d4a8b0bb5e4ff65e2108dba;hpb=527c680f7c36ff17d49efc99632232dba3549c51;p=karo-tx-linux.git diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6c81a4c040b9..c34719c917b1 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -52,13 +52,6 @@ #define SECTOR_SHIFT 9 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) -/* It might be useful to have these defined elsewhere */ - -#define U8_MAX ((u8) (~0U)) -#define U16_MAX ((u16) (~0U)) -#define U32_MAX ((u32) (~0U)) -#define U64_MAX ((u64) (~0ULL)) - #define RBD_DRV_NAME "rbd" #define RBD_DRV_NAME_LONG "rbd (rados block device)" @@ -80,11 +73,14 @@ /* Feature bits */ -#define RBD_FEATURE_LAYERING 1 +#define RBD_FEATURE_LAYERING (1<<0) +#define RBD_FEATURE_STRIPINGV2 (1<<1) +#define RBD_FEATURES_ALL \ + (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) /* Features supported by this (client software) implementation. */ -#define RBD_FEATURES_ALL (0) +#define RBD_FEATURES_SUPPORTED (0) /* * An RBD device name will be "rbd#", where the "rbd" comes from @@ -174,13 +170,44 @@ enum obj_request_type { OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES }; +enum obj_req_flags { + OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ + OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ + OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ + OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ +}; + struct rbd_obj_request { const char *object_name; u64 offset; /* object start byte */ u64 length; /* bytes from offset */ + unsigned long flags; - struct rbd_img_request *img_request; - struct list_head links; /* img_request->obj_requests */ + /* + * An object request associated with an image will have its + * img_data flag set; a standalone object request will not. + * + * A standalone object request will have which == BAD_WHICH + * and a null obj_request pointer. + * + * An object request initiated in support of a layered image + * object (to check for its existence before a write) will + * have which == BAD_WHICH and a non-null obj_request pointer. + * + * Finally, an object request for rbd image data will have + * which != BAD_WHICH, and will have a non-null img_request + * pointer. The value of which will be in the range + * 0..(img_request->obj_request_count-1). + */ + union { + struct rbd_obj_request *obj_request; /* STAT op */ + struct { + struct rbd_img_request *img_request; + u64 img_offset; + /* links for img_request->obj_requests list */ + struct list_head links; + }; + }; u32 which; /* posn image request list */ enum obj_request_type type; @@ -191,13 +218,13 @@ struct rbd_obj_request { u32 page_count; }; }; + struct page **copyup_pages; struct ceph_osd_request *osd_req; u64 xferred; /* bytes transferred */ u64 version; int result; - atomic_t done; rbd_obj_callback_t callback; struct completion completion; @@ -205,19 +232,31 @@ struct rbd_obj_request { struct kref kref; }; +enum img_req_flags { + IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ + IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ + IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ +}; + struct rbd_img_request { - struct request *rq; struct rbd_device *rbd_dev; u64 offset; /* starting image byte offset */ u64 length; /* byte count from offset */ - bool write_request; /* false for read */ + unsigned long flags; union { + u64 snap_id; /* for reads */ struct ceph_snap_context *snapc; /* for writes */ - u64 snap_id; /* for reads */ }; + union { + struct request *rq; /* block request */ + struct rbd_obj_request *obj_request; /* obj req initiator */ + }; + struct page **copyup_pages; spinlock_t completion_lock;/* protects next_completion */ u32 next_completion; rbd_img_callback_t callback; + u64 xferred;/* aggregate bytes transferred */ + int result; /* first nonzero obj_request result */ u32 obj_request_count; struct list_head obj_requests; /* rbd_obj_request structs */ @@ -276,6 +315,7 @@ struct rbd_device { struct rbd_spec *parent_spec; u64 parent_overlap; + struct rbd_device *parent; /* protects updating the header */ struct rw_semaphore header_rwsem; @@ -312,6 +352,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock); static LIST_HEAD(rbd_client_list); /* clients */ static DEFINE_SPINLOCK(rbd_client_list_lock); +static int rbd_img_request_submit(struct rbd_img_request *img_request); + static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); @@ -322,6 +364,7 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, size_t count); +static int rbd_dev_probe(struct rbd_device *rbd_dev); static struct bus_attribute rbd_bus_attrs[] = { __ATTR(add, S_IWUSR, NULL, rbd_add), @@ -383,6 +426,9 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) # define rbd_assert(expr) ((void) 0) #endif /* !RBD_DEBUG */ +static void rbd_img_parent_read(struct rbd_obj_request *obj_request); +static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); + static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); @@ -484,6 +530,13 @@ out_opt: return ERR_PTR(ret); } +static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) +{ + kref_get(&rbdc->kref); + + return rbdc; +} + /* * Find a ceph client with specific addr and configuration. If * found, bump its reference count. @@ -499,7 +552,8 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) spin_lock(&rbd_client_list_lock); list_for_each_entry(client_node, &rbd_client_list, node) { if (!ceph_compare_options(ceph_opts, client_node->client)) { - kref_get(&client_node->kref); + __rbd_get_client(client_node); + found = true; break; } @@ -920,6 +974,37 @@ static void zero_bio_chain(struct bio *chain, int start_ofs) } } +/* + * similar to zero_bio_chain(), zeros data defined by a page array, + * starting at the given byte offset from the start of the array and + * continuing up to the given end offset. The pages array is + * assumed to be big enough to hold all bytes up to the end. + */ +static void zero_pages(struct page **pages, u64 offset, u64 end) +{ + struct page **page = &pages[offset >> PAGE_SHIFT]; + + rbd_assert(end > offset); + rbd_assert(end - offset <= (u64)SIZE_MAX); + while (offset < end) { + size_t page_offset; + size_t length; + unsigned long flags; + void *kaddr; + + page_offset = (size_t)(offset & ~PAGE_MASK); + length = min(PAGE_SIZE - page_offset, (size_t)(end - offset)); + local_irq_save(flags); + kaddr = kmap_atomic(*page); + memset(kaddr + page_offset, 0, length); + kunmap_atomic(kaddr); + local_irq_restore(flags); + + offset += length; + page++; + } +} + /* * Clone a portion of a bio, starting at the given byte offset * and continuing for the number of bytes indicated. @@ -1064,6 +1149,77 @@ out_err: return NULL; } +/* + * The default/initial value for all object request flags is 0. For + * each flag, once its value is set to 1 it is never reset to 0 + * again. + */ +static void obj_request_img_data_set(struct rbd_obj_request *obj_request) +{ + if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { + struct rbd_device *rbd_dev; + + rbd_dev = obj_request->img_request->rbd_dev; + rbd_warn(rbd_dev, "obj_request %p already marked img_data\n", + obj_request); + } +} + +static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; +} + +static void obj_request_done_set(struct rbd_obj_request *obj_request) +{ + if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { + struct rbd_device *rbd_dev = NULL; + + if (obj_request_img_data_test(obj_request)) + rbd_dev = obj_request->img_request->rbd_dev; + rbd_warn(rbd_dev, "obj_request %p already marked done\n", + obj_request); + } +} + +static bool obj_request_done_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; +} + +/* + * This sets the KNOWN flag after (possibly) setting the EXISTS + * flag. The latter is set based on the "exists" value provided. + * + * Note that for our purposes once an object exists it never goes + * away again. It's possible that the response from two existence + * checks are separated by the creation of the target object, and + * the first ("doesn't exist") response arrives *after* the second + * ("does exist"). In that case we ignore the second one. + */ +static void obj_request_existence_set(struct rbd_obj_request *obj_request, + bool exists) +{ + if (exists) + set_bit(OBJ_REQ_EXISTS, &obj_request->flags); + set_bit(OBJ_REQ_KNOWN, &obj_request->flags); + smp_mb(); +} + +static bool obj_request_known_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; +} + +static bool obj_request_exists_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; +} + static void rbd_obj_request_get(struct rbd_obj_request *obj_request) { dout("%s: obj %p (was %d)\n", __func__, obj_request, @@ -1101,9 +1257,11 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, { rbd_assert(obj_request->img_request == NULL); - rbd_obj_request_get(obj_request); + /* Image request now owns object's original reference */ obj_request->img_request = img_request; obj_request->which = img_request->obj_request_count; + rbd_assert(!obj_request_img_data_test(obj_request)); + obj_request_img_data_set(obj_request); rbd_assert(obj_request->which != BAD_WHICH); img_request->obj_request_count++; list_add_tail(&obj_request->links, &img_request->obj_requests); @@ -1123,6 +1281,7 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, img_request->obj_request_count--; rbd_assert(obj_request->which == img_request->obj_request_count); obj_request->which = BAD_WHICH; + rbd_assert(obj_request_img_data_test(obj_request)); rbd_assert(obj_request->img_request == img_request); obj_request->img_request = NULL; obj_request->callback = NULL; @@ -1141,76 +1300,6 @@ static bool obj_request_type_valid(enum obj_request_type type) } } -static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...) -{ - struct ceph_osd_req_op *op; - va_list args; - size_t size; - - op = kzalloc(sizeof (*op), GFP_NOIO); - if (!op) - return NULL; - op->op = opcode; - va_start(args, opcode); - switch (opcode) { - case CEPH_OSD_OP_READ: - case CEPH_OSD_OP_WRITE: - /* rbd_osd_req_op_create(READ, offset, length) */ - /* rbd_osd_req_op_create(WRITE, offset, length) */ - op->extent.offset = va_arg(args, u64); - op->extent.length = va_arg(args, u64); - if (opcode == CEPH_OSD_OP_WRITE) - op->payload_len = op->extent.length; - break; - case CEPH_OSD_OP_STAT: - break; - case CEPH_OSD_OP_CALL: - /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */ - op->cls.class_name = va_arg(args, char *); - size = strlen(op->cls.class_name); - rbd_assert(size <= (size_t) U8_MAX); - op->cls.class_len = size; - op->payload_len = size; - - op->cls.method_name = va_arg(args, char *); - size = strlen(op->cls.method_name); - rbd_assert(size <= (size_t) U8_MAX); - op->cls.method_len = size; - op->payload_len += size; - - op->cls.argc = 0; - op->cls.indata = va_arg(args, void *); - size = va_arg(args, size_t); - rbd_assert(size <= (size_t) U32_MAX); - op->cls.indata_len = (u32) size; - op->payload_len += size; - break; - case CEPH_OSD_OP_NOTIFY_ACK: - case CEPH_OSD_OP_WATCH: - /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */ - /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */ - op->watch.cookie = va_arg(args, u64); - op->watch.ver = va_arg(args, u64); - op->watch.ver = cpu_to_le64(op->watch.ver); - if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int)) - op->watch.flag = (u8) 1; - break; - default: - rbd_warn(NULL, "unsupported opcode %hu\n", opcode); - kfree(op); - op = NULL; - break; - } - va_end(args); - - return op; -} - -static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op) -{ - kfree(op); -} - static int rbd_obj_request_submit(struct ceph_osd_client *osdc, struct rbd_obj_request *obj_request) { @@ -1221,7 +1310,24 @@ static int rbd_obj_request_submit(struct ceph_osd_client *osdc, static void rbd_img_request_complete(struct rbd_img_request *img_request) { + dout("%s: img %p\n", __func__, img_request); + + /* + * If no error occurred, compute the aggregate transfer + * count for the image request. We could instead use + * atomic64_cmpxchg() to update it as each object request + * completes; not clear which way is better off hand. + */ + if (!img_request->result) { + struct rbd_obj_request *obj_request; + u64 xferred = 0; + + for_each_obj_request(img_request, obj_request) + xferred += obj_request->xferred; + img_request->xferred = xferred; + } + if (img_request->callback) img_request->callback(img_request); else @@ -1237,31 +1343,79 @@ static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) return wait_for_completion_interruptible(&obj_request->completion); } -static void obj_request_done_init(struct rbd_obj_request *obj_request) +/* + * The default/initial value for all image request flags is 0. Each + * is conditionally set to 1 at image request initialization time + * and currently never change thereafter. + */ +static void img_request_write_set(struct rbd_img_request *img_request) { - atomic_set(&obj_request->done, 0); - smp_wmb(); + set_bit(IMG_REQ_WRITE, &img_request->flags); + smp_mb(); } -static void obj_request_done_set(struct rbd_obj_request *obj_request) +static bool img_request_write_test(struct rbd_img_request *img_request) { - int done; + smp_mb(); + return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; +} - done = atomic_inc_return(&obj_request->done); - if (done > 1) { - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev; +static void img_request_child_set(struct rbd_img_request *img_request) +{ + set_bit(IMG_REQ_CHILD, &img_request->flags); + smp_mb(); +} - rbd_dev = img_request ? img_request->rbd_dev : NULL; - rbd_warn(rbd_dev, "obj_request %p was already done\n", - obj_request); - } +static bool img_request_child_test(struct rbd_img_request *img_request) +{ + smp_mb(); + return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; } -static bool obj_request_done_test(struct rbd_obj_request *obj_request) +static void img_request_layered_set(struct rbd_img_request *img_request) +{ + set_bit(IMG_REQ_LAYERED, &img_request->flags); + smp_mb(); +} + +static bool img_request_layered_test(struct rbd_img_request *img_request) { smp_mb(); - return atomic_read(&obj_request->done) != 0; + return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; +} + +static void +rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) +{ + u64 xferred = obj_request->xferred; + u64 length = obj_request->length; + + dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, + obj_request, obj_request->img_request, obj_request->result, + xferred, length); + /* + * ENOENT means a hole in the image. We zero-fill the + * entire length of the request. A short read also implies + * zero-fill to the end of the request. Either way we + * update the xferred count to indicate the whole request + * was satisfied. + */ + rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); + if (obj_request->result == -ENOENT) { + if (obj_request->type == OBJ_REQUEST_BIO) + zero_bio_chain(obj_request->bio_list, 0); + else + zero_pages(obj_request->pages, 0, length); + obj_request->result = 0; + obj_request->xferred = length; + } else if (xferred < length && !obj_request->result) { + if (obj_request->type == OBJ_REQUEST_BIO) + zero_bio_chain(obj_request->bio_list, xferred); + else + zero_pages(obj_request->pages, xferred, length); + obj_request->xferred = length; + } + obj_request_done_set(obj_request); } static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) @@ -1282,25 +1436,26 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) { - dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, - obj_request->result, obj_request->xferred, obj_request->length); - /* - * ENOENT means a hole in the object. We zero-fill the - * entire length of the request. A short read also implies - * zero-fill to the end of the request. Either way we - * update the xferred count to indicate the whole request - * was satisfied. - */ - if (obj_request->result == -ENOENT) { - zero_bio_chain(obj_request->bio_list, 0); - obj_request->result = 0; - obj_request->xferred = obj_request->length; - } else if (obj_request->xferred < obj_request->length && - !obj_request->result) { - zero_bio_chain(obj_request->bio_list, obj_request->xferred); - obj_request->xferred = obj_request->length; + struct rbd_img_request *img_request = NULL; + bool layered = false; + + if (obj_request_img_data_test(obj_request)) { + img_request = obj_request->img_request; + layered = img_request && img_request_layered_test(img_request); + } else { + img_request = NULL; + layered = false; } - obj_request_done_set(obj_request); + + dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, + obj_request, img_request, obj_request->result, + obj_request->xferred, obj_request->length); + if (layered && obj_request->result == -ENOENT) + rbd_img_parent_read(obj_request); + else if (img_request) + rbd_img_obj_request_read_callback(obj_request); + else + obj_request_done_set(obj_request); } static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) @@ -1308,9 +1463,8 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) dout("%s: obj %p result %d %llu\n", __func__, obj_request, obj_request->result, obj_request->length); /* - * There is no such thing as a successful short write. - * Our xferred value is the number of bytes transferred - * back. Set it to our originally-requested length. + * There is no such thing as a successful short write. Set + * it to our originally-requested length. */ obj_request->xferred = obj_request->length; obj_request_done_set(obj_request); @@ -1334,22 +1488,26 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); rbd_assert(osd_req == obj_request->osd_req); - rbd_assert(!!obj_request->img_request ^ - (obj_request->which == BAD_WHICH)); + if (obj_request_img_data_test(obj_request)) { + rbd_assert(obj_request->img_request); + rbd_assert(obj_request->which != BAD_WHICH); + } else { + rbd_assert(obj_request->which == BAD_WHICH); + } if (osd_req->r_result < 0) obj_request->result = osd_req->r_result; obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); - WARN_ON(osd_req->r_num_ops != 1); /* For now */ + BUG_ON(osd_req->r_num_ops > 2); /* * We support a 64-bit length, but ultimately it has to be * passed to blk_end_request(), which takes an unsigned int. */ obj_request->xferred = osd_req->r_reply_op_len[0]; - rbd_assert(obj_request->xferred < (u64) UINT_MAX); - opcode = osd_req->r_request_ops[0].op; + rbd_assert(obj_request->xferred < (u64)UINT_MAX); + opcode = osd_req->r_ops[0].op; switch (opcode) { case CEPH_OSD_OP_READ: rbd_osd_read_callback(obj_request); @@ -1375,28 +1533,49 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, rbd_obj_request_complete(obj_request); } +static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + u64 snap_id; + + rbd_assert(osd_req != NULL); + + snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP; + ceph_osdc_build_request(osd_req, obj_request->offset, + NULL, snap_id, NULL); +} + +static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + struct ceph_snap_context *snapc; + struct timespec mtime = CURRENT_TIME; + + rbd_assert(osd_req != NULL); + + snapc = img_request ? img_request->snapc : NULL; + ceph_osdc_build_request(osd_req, obj_request->offset, + snapc, CEPH_NOSNAP, &mtime); +} + static struct ceph_osd_request *rbd_osd_req_create( struct rbd_device *rbd_dev, bool write_request, - struct rbd_obj_request *obj_request, - struct ceph_osd_req_op *op) + struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = obj_request->img_request; struct ceph_snap_context *snapc = NULL; struct ceph_osd_client *osdc; struct ceph_osd_request *osd_req; - struct timespec now; - struct timespec *mtime; - u64 snap_id = CEPH_NOSNAP; - u64 offset = obj_request->offset; - u64 length = obj_request->length; - if (img_request) { - rbd_assert(img_request->write_request == write_request); - if (img_request->write_request) + if (obj_request_img_data_test(obj_request)) { + struct rbd_img_request *img_request = obj_request->img_request; + + rbd_assert(write_request == + img_request_write_test(img_request)); + if (write_request) snapc = img_request->snapc; - else - snap_id = img_request->snap_id; } /* Allocate and initialize the request, for the single op */ @@ -1406,31 +1585,10 @@ static struct ceph_osd_request *rbd_osd_req_create( if (!osd_req) return NULL; /* ENOMEM */ - rbd_assert(obj_request_type_valid(obj_request->type)); - switch (obj_request->type) { - case OBJ_REQUEST_NODATA: - break; /* Nothing to do */ - case OBJ_REQUEST_BIO: - rbd_assert(obj_request->bio_list != NULL); - osd_req->r_bio = obj_request->bio_list; - break; - case OBJ_REQUEST_PAGES: - osd_req->r_pages = obj_request->pages; - osd_req->r_num_pages = obj_request->page_count; - osd_req->r_page_alignment = offset & ~PAGE_MASK; - break; - } - - if (write_request) { + if (write_request) osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; - now = CURRENT_TIME; - mtime = &now; - } else { + else osd_req->r_flags = CEPH_OSD_FLAG_READ; - mtime = NULL; /* not needed for reads */ - offset = 0; /* These are not used... */ - length = 0; /* ...for osd read requests */ - } osd_req->r_callback = rbd_osd_req_callback; osd_req->r_priv = obj_request; @@ -1441,14 +1599,51 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_file_layout = rbd_dev->layout; /* struct */ - /* osd_req will get its own reference to snapc (if non-null) */ + return osd_req; +} - ceph_osdc_build_request(osd_req, offset, length, 1, op, - snapc, snap_id, mtime); +/* + * Create a copyup osd request based on the information in the + * object request supplied. A copyup request has two osd ops, + * a copyup method call, and a "normal" write request. + */ +static struct ceph_osd_request * +rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + struct ceph_snap_context *snapc; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct ceph_osd_request *osd_req; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + rbd_assert(img_request); + rbd_assert(img_request_write_test(img_request)); + + /* Allocate and initialize the request, for the two ops */ + + snapc = img_request->snapc; + rbd_dev = img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); + if (!osd_req) + return NULL; /* ENOMEM */ + + osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + osd_req->r_callback = rbd_osd_req_callback; + osd_req->r_priv = obj_request; + + osd_req->r_oid_len = strlen(obj_request->object_name); + rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); + memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); + + osd_req->r_file_layout = rbd_dev->layout; /* struct */ return osd_req; } + static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) { ceph_osdc_put_request(osd_req); @@ -1475,10 +1670,10 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, obj_request->object_name = memcpy(name, object_name, size); obj_request->offset = offset; obj_request->length = length; + obj_request->flags = 0; obj_request->which = BAD_WHICH; obj_request->type = type; INIT_LIST_HEAD(&obj_request->links); - obj_request_done_init(obj_request); init_completion(&obj_request->completion); kref_init(&obj_request->kref); @@ -1528,7 +1723,8 @@ static void rbd_obj_request_destroy(struct kref *kref) static struct rbd_img_request *rbd_img_request_create( struct rbd_device *rbd_dev, u64 offset, u64 length, - bool write_request) + bool write_request, + bool child_request) { struct rbd_img_request *img_request; struct ceph_snap_context *snapc = NULL; @@ -1545,216 +1741,712 @@ static struct rbd_img_request *rbd_img_request_create( kfree(img_request); return NULL; /* Shouldn't happen */ } + } img_request->rq = NULL; img_request->rbd_dev = rbd_dev; img_request->offset = offset; img_request->length = length; - img_request->write_request = write_request; - if (write_request) + img_request->flags = 0; + if (write_request) { + img_request_write_set(img_request); img_request->snapc = snapc; - else + } else { img_request->snap_id = rbd_dev->spec->snap_id; + } + if (child_request) + img_request_child_set(img_request); + if (rbd_dev->parent_spec) + img_request_layered_set(img_request); spin_lock_init(&img_request->completion_lock); img_request->next_completion = 0; img_request->callback = NULL; + img_request->result = 0; img_request->obj_request_count = 0; INIT_LIST_HEAD(&img_request->obj_requests); kref_init(&img_request->kref); - rbd_img_request_get(img_request); /* Avoid a warning */ - rbd_img_request_put(img_request); /* TEMPORARY */ + rbd_img_request_get(img_request); /* Avoid a warning */ + rbd_img_request_put(img_request); /* TEMPORARY */ + + dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, + write_request ? "write" : "read", offset, length, + img_request); + + return img_request; +} + +static void rbd_img_request_destroy(struct kref *kref) +{ + struct rbd_img_request *img_request; + struct rbd_obj_request *obj_request; + struct rbd_obj_request *next_obj_request; + + img_request = container_of(kref, struct rbd_img_request, kref); + + dout("%s: img %p\n", __func__, img_request); + + for_each_obj_request_safe(img_request, obj_request, next_obj_request) + rbd_img_obj_request_del(img_request, obj_request); + rbd_assert(img_request->obj_request_count == 0); + + if (img_request_write_test(img_request)) + ceph_put_snap_context(img_request->snapc); + + if (img_request_child_test(img_request)) + rbd_obj_request_put(img_request->obj_request); + + kfree(img_request); +} + +static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + unsigned int xferred; + int result; + bool more; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + + rbd_assert(obj_request->xferred <= (u64)UINT_MAX); + xferred = (unsigned int)obj_request->xferred; + result = obj_request->result; + if (result) { + struct rbd_device *rbd_dev = img_request->rbd_dev; + + rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n", + img_request_write_test(img_request) ? "write" : "read", + obj_request->length, obj_request->img_offset, + obj_request->offset); + rbd_warn(rbd_dev, " result %d xferred %x\n", + result, xferred); + if (!img_request->result) + img_request->result = result; + } + + /* Image object requests don't own their page array */ + + if (obj_request->type == OBJ_REQUEST_PAGES) { + obj_request->pages = NULL; + obj_request->page_count = 0; + } + + if (img_request_child_test(img_request)) { + rbd_assert(img_request->obj_request != NULL); + more = obj_request->which < img_request->obj_request_count - 1; + } else { + rbd_assert(img_request->rq != NULL); + more = blk_end_request(img_request->rq, result, xferred); + } + + return more; +} + +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + u32 which = obj_request->which; + bool more = true; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); + rbd_assert(img_request != NULL); + rbd_assert(img_request->obj_request_count > 0); + rbd_assert(which != BAD_WHICH); + rbd_assert(which < img_request->obj_request_count); + rbd_assert(which >= img_request->next_completion); + + spin_lock_irq(&img_request->completion_lock); + if (which != img_request->next_completion) + goto out; + + for_each_obj_request_from(img_request, obj_request) { + rbd_assert(more); + rbd_assert(which < img_request->obj_request_count); + + if (!obj_request_done_test(obj_request)) + break; + more = rbd_img_obj_end_request(obj_request); + which++; + } + + rbd_assert(more ^ (which == img_request->obj_request_count)); + img_request->next_completion = which; +out: + spin_unlock_irq(&img_request->completion_lock); + + if (!more) + rbd_img_request_complete(img_request); +} + +/* + * Split up an image request into one or more object requests, each + * to a different object. The "type" parameter indicates whether + * "data_desc" is the pointer to the head of a list of bio + * structures, or the base of a page array. In either case this + * function assumes data_desc describes memory sufficient to hold + * all data described by the image request. + */ +static int rbd_img_request_fill(struct rbd_img_request *img_request, + enum obj_request_type type, + void *data_desc) +{ + struct rbd_device *rbd_dev = img_request->rbd_dev; + struct rbd_obj_request *obj_request = NULL; + struct rbd_obj_request *next_obj_request; + bool write_request = img_request_write_test(img_request); + struct bio *bio_list; + unsigned int bio_offset = 0; + struct page **pages; + u64 img_offset; + u64 resid; + u16 opcode; + + dout("%s: img %p type %d data_desc %p\n", __func__, img_request, + (int)type, data_desc); + + opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ; + img_offset = img_request->offset; + resid = img_request->length; + rbd_assert(resid > 0); + + if (type == OBJ_REQUEST_BIO) { + bio_list = data_desc; + rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT); + } else { + rbd_assert(type == OBJ_REQUEST_PAGES); + pages = data_desc; + } + + while (resid) { + struct ceph_osd_request *osd_req; + const char *object_name; + u64 offset; + u64 length; + + object_name = rbd_segment_name(rbd_dev, img_offset); + if (!object_name) + goto out_unwind; + offset = rbd_segment_offset(rbd_dev, img_offset); + length = rbd_segment_length(rbd_dev, img_offset, resid); + obj_request = rbd_obj_request_create(object_name, + offset, length, type); + kfree(object_name); /* object request has its own copy */ + if (!obj_request) + goto out_unwind; + + if (type == OBJ_REQUEST_BIO) { + unsigned int clone_size; + + rbd_assert(length <= (u64)UINT_MAX); + clone_size = (unsigned int)length; + obj_request->bio_list = + bio_chain_clone_range(&bio_list, + &bio_offset, + clone_size, + GFP_ATOMIC); + if (!obj_request->bio_list) + goto out_partial; + } else { + unsigned int page_count; + + obj_request->pages = pages; + page_count = (u32)calc_pages_for(offset, length); + obj_request->page_count = page_count; + if ((offset + length) & ~PAGE_MASK) + page_count--; /* more on last page */ + pages += page_count; + } + + osd_req = rbd_osd_req_create(rbd_dev, write_request, + obj_request); + if (!osd_req) + goto out_partial; + obj_request->osd_req = osd_req; + obj_request->callback = rbd_img_obj_callback; + + osd_req_op_extent_init(osd_req, 0, opcode, offset, length, + 0, 0); + if (type == OBJ_REQUEST_BIO) + osd_req_op_extent_osd_data_bio(osd_req, 0, + obj_request->bio_list, length); + else + osd_req_op_extent_osd_data_pages(osd_req, 0, + obj_request->pages, length, + offset & ~PAGE_MASK, false, false); + + if (write_request) + rbd_osd_req_format_write(obj_request); + else + rbd_osd_req_format_read(obj_request); + + obj_request->img_offset = img_offset; + rbd_img_obj_request_add(img_request, obj_request); + + img_offset += length; + resid -= length; + } + + return 0; + +out_partial: + rbd_obj_request_put(obj_request); +out_unwind: + for_each_obj_request_safe(img_request, obj_request, next_obj_request) + rbd_obj_request_put(obj_request); + + return -ENOMEM; +} + +static void +rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + struct rbd_device *rbd_dev; + u64 length; + u32 page_count; + + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + rbd_assert(img_request); + + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev); + length = (u64)1 << rbd_dev->header.obj_order; + page_count = (u32)calc_pages_for(0, length); + + rbd_assert(obj_request->copyup_pages); + ceph_release_page_vector(obj_request->copyup_pages, page_count); + obj_request->copyup_pages = NULL; + + /* + * We want the transfer count to reflect the size of the + * original write request. There is no such thing as a + * successful short write, so if the request was successful + * we can just set it to the originally-requested length. + */ + if (!obj_request->result) + obj_request->xferred = obj_request->length; + + /* Finish up with the normal image object callback */ + + rbd_img_obj_callback(obj_request); +} + +static void +rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *orig_request; + struct ceph_osd_request *osd_req; + struct ceph_osd_client *osdc; + struct rbd_device *rbd_dev; + struct page **pages; + int result; + u64 obj_size; + u64 xferred; + + rbd_assert(img_request_child_test(img_request)); + + /* First get what we need from the image request */ + + pages = img_request->copyup_pages; + rbd_assert(pages != NULL); + img_request->copyup_pages = NULL; + + orig_request = img_request->obj_request; + rbd_assert(orig_request != NULL); + rbd_assert(orig_request->type == OBJ_REQUEST_BIO); + result = img_request->result; + obj_size = img_request->length; + xferred = img_request->xferred; + + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev); + rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order); + + rbd_img_request_put(img_request); + + if (result) + goto out_err; + + /* Allocate the new copyup osd request for the original request */ - dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, - write_request ? "write" : "read", offset, length, - img_request); + result = -ENOMEM; + rbd_assert(!orig_request->osd_req); + osd_req = rbd_osd_req_create_copyup(orig_request); + if (!osd_req) + goto out_err; + orig_request->osd_req = osd_req; + orig_request->copyup_pages = pages; - return img_request; -} + /* Initialize the copyup op */ -static void rbd_img_request_destroy(struct kref *kref) -{ - struct rbd_img_request *img_request; - struct rbd_obj_request *obj_request; - struct rbd_obj_request *next_obj_request; + osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); + osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0, + false, false); - img_request = container_of(kref, struct rbd_img_request, kref); + /* Then the original write request op */ - dout("%s: img %p\n", __func__, img_request); + osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, + orig_request->offset, + orig_request->length, 0, 0); + osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list, + orig_request->length); - for_each_obj_request_safe(img_request, obj_request, next_obj_request) - rbd_img_obj_request_del(img_request, obj_request); - rbd_assert(img_request->obj_request_count == 0); + rbd_osd_req_format_write(orig_request); - if (img_request->write_request) - ceph_put_snap_context(img_request->snapc); + /* All set, send it off. */ - kfree(img_request); + orig_request->callback = rbd_img_obj_copyup_callback; + osdc = &rbd_dev->rbd_client->client->osdc; + result = rbd_obj_request_submit(osdc, orig_request); + if (!result) + return; +out_err: + /* Record the error code and complete the request */ + + orig_request->result = result; + orig_request->xferred = 0; + obj_request_done_set(orig_request); + rbd_obj_request_complete(orig_request); } -static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, - struct bio *bio_list) +/* + * Read from the parent image the range of data that covers the + * entire target of the given object request. This is used for + * satisfying a layered image write request when the target of an + * object request from the image request does not exist. + * + * A page array big enough to hold the returned data is allocated + * and supplied to rbd_img_request_fill() as the "data descriptor." + * When the read completes, this page array will be transferred to + * the original object request for the copyup operation. + * + * If an error occurs, record it as the result of the original + * object request and mark it done so it gets completed. + */ +static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - struct rbd_obj_request *obj_request = NULL; - struct rbd_obj_request *next_obj_request; - unsigned int bio_offset; - u64 image_offset; - u64 resid; - u16 opcode; + struct rbd_img_request *img_request = NULL; + struct rbd_img_request *parent_request = NULL; + struct rbd_device *rbd_dev; + u64 img_offset; + u64 length; + struct page **pages = NULL; + u32 page_count; + int result; - dout("%s: img %p bio %p\n", __func__, img_request, bio_list); + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); - opcode = img_request->write_request ? CEPH_OSD_OP_WRITE - : CEPH_OSD_OP_READ; - bio_offset = 0; - image_offset = img_request->offset; - rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT); - resid = img_request->length; - rbd_assert(resid > 0); - while (resid) { - const char *object_name; - unsigned int clone_size; - struct ceph_osd_req_op *op; - u64 offset; - u64 length; + img_request = obj_request->img_request; + rbd_assert(img_request != NULL); + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev->parent != NULL); - object_name = rbd_segment_name(rbd_dev, image_offset); - if (!object_name) - goto out_unwind; - offset = rbd_segment_offset(rbd_dev, image_offset); - length = rbd_segment_length(rbd_dev, image_offset, resid); - obj_request = rbd_obj_request_create(object_name, - offset, length, - OBJ_REQUEST_BIO); - kfree(object_name); /* object request has its own copy */ - if (!obj_request) - goto out_unwind; + /* + * First things first. The original osd request is of no + * use to use any more, we'll need a new one that can hold + * the two ops in a copyup request. We'll get that later, + * but for now we can release the old one. + */ + rbd_osd_req_destroy(obj_request->osd_req); + obj_request->osd_req = NULL; - rbd_assert(length <= (u64) UINT_MAX); - clone_size = (unsigned int) length; - obj_request->bio_list = bio_chain_clone_range(&bio_list, - &bio_offset, clone_size, - GFP_ATOMIC); - if (!obj_request->bio_list) - goto out_partial; + /* + * Determine the byte range covered by the object in the + * child image to which the original request was to be sent. + */ + img_offset = obj_request->img_offset - obj_request->offset; + length = (u64)1 << rbd_dev->header.obj_order; - /* - * Build up the op to use in building the osd - * request. Note that the contents of the op are - * copied by rbd_osd_req_create(). - */ - op = rbd_osd_req_op_create(opcode, offset, length); - if (!op) - goto out_partial; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, - img_request->write_request, - obj_request, op); - rbd_osd_req_op_destroy(op); - if (!obj_request->osd_req) - goto out_partial; - /* status and version are initially zero-filled */ + /* + * Allocate a page array big enough to receive the data read + * from the parent. + */ + page_count = (u32)calc_pages_for(0, length); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) { + result = PTR_ERR(pages); + pages = NULL; + goto out_err; + } - rbd_img_obj_request_add(img_request, obj_request); + result = -ENOMEM; + parent_request = rbd_img_request_create(rbd_dev->parent, + img_offset, length, + false, true); + if (!parent_request) + goto out_err; + rbd_obj_request_get(obj_request); + parent_request->obj_request = obj_request; - image_offset += length; - resid -= length; - } + result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); + if (result) + goto out_err; + parent_request->copyup_pages = pages; - return 0; + parent_request->callback = rbd_img_obj_parent_read_full_callback; + result = rbd_img_request_submit(parent_request); + if (!result) + return 0; -out_partial: + parent_request->copyup_pages = NULL; + parent_request->obj_request = NULL; rbd_obj_request_put(obj_request); -out_unwind: - for_each_obj_request_safe(img_request, obj_request, next_obj_request) - rbd_obj_request_put(obj_request); +out_err: + if (pages) + ceph_release_page_vector(pages, page_count); + if (parent_request) + rbd_img_request_put(parent_request); + obj_request->result = result; + obj_request->xferred = 0; + obj_request_done_set(obj_request); - return -ENOMEM; + return result; } -static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request; - u32 which = obj_request->which; - bool more = true; + struct rbd_obj_request *orig_request; + int result; - img_request = obj_request->img_request; + rbd_assert(!obj_request_img_data_test(obj_request)); - dout("%s: img %p obj %p\n", __func__, img_request, obj_request); - rbd_assert(img_request != NULL); - rbd_assert(img_request->rq != NULL); - rbd_assert(img_request->obj_request_count > 0); - rbd_assert(which != BAD_WHICH); - rbd_assert(which < img_request->obj_request_count); - rbd_assert(which >= img_request->next_completion); + /* + * All we need from the object request is the original + * request and the result of the STAT op. Grab those, then + * we're done with the request. + */ + orig_request = obj_request->obj_request; + obj_request->obj_request = NULL; + rbd_assert(orig_request); + rbd_assert(orig_request->img_request); - spin_lock_irq(&img_request->completion_lock); - if (which != img_request->next_completion) + result = obj_request->result; + obj_request->result = 0; + + dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, + obj_request, orig_request, result, + obj_request->xferred, obj_request->length); + rbd_obj_request_put(obj_request); + + rbd_assert(orig_request); + rbd_assert(orig_request->img_request); + + /* + * Our only purpose here is to determine whether the object + * exists, and we don't want to treat the non-existence as + * an error. If something else comes back, transfer the + * error to the original request and complete it now. + */ + if (!result) { + obj_request_existence_set(orig_request, true); + } else if (result == -ENOENT) { + obj_request_existence_set(orig_request, false); + } else if (result) { + orig_request->result = result; goto out; + } - for_each_obj_request_from(img_request, obj_request) { - unsigned int xferred; - int result; + /* + * Resubmit the original request now that we have recorded + * whether the target object exists. + */ + orig_request->result = rbd_img_obj_request_submit(orig_request); +out: + if (orig_request->result) + rbd_obj_request_complete(orig_request); + rbd_obj_request_put(orig_request); +} - rbd_assert(more); - rbd_assert(which < img_request->obj_request_count); +static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) +{ + struct rbd_obj_request *stat_request; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct page **pages = NULL; + u32 page_count; + size_t size; + int ret; - if (!obj_request_done_test(obj_request)) - break; + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); + page_count = (u32)calc_pages_for(0, size); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); - rbd_assert(obj_request->xferred <= (u64) UINT_MAX); - xferred = (unsigned int) obj_request->xferred; - result = (int) obj_request->result; - if (result) - rbd_warn(NULL, "obj_request %s result %d xferred %u\n", - img_request->write_request ? "write" : "read", - result, xferred); + ret = -ENOMEM; + stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!stat_request) + goto out; - more = blk_end_request(img_request->rq, result, xferred); - which++; - } + rbd_obj_request_get(obj_request); + stat_request->obj_request = obj_request; + stat_request->pages = pages; + stat_request->page_count = page_count; + + rbd_assert(obj_request->img_request); + rbd_dev = obj_request->img_request->rbd_dev; + stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, + stat_request); + if (!stat_request->osd_req) + goto out; + stat_request->callback = rbd_img_obj_exists_callback; - rbd_assert(more ^ (which == img_request->obj_request_count)); - img_request->next_completion = which; + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT); + osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, + false, false); + rbd_osd_req_format_read(stat_request); + + osdc = &rbd_dev->rbd_client->client->osdc; + ret = rbd_obj_request_submit(osdc, stat_request); out: - spin_unlock_irq(&img_request->completion_lock); + if (ret) + rbd_obj_request_put(obj_request); - if (!more) - rbd_img_request_complete(img_request); + return ret; +} + +static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + bool known; + + rbd_assert(obj_request_img_data_test(obj_request)); + + img_request = obj_request->img_request; + rbd_assert(img_request); + + /* + * Only layered writes need special handling. If it's not a + * layered write, or it is a layered write but we know the + * target object exists, it's no different from any other + * object request. + */ + if (!img_request_write_test(img_request) || + !img_request_layered_test(img_request) || + ((known = obj_request_known_test(obj_request)) && + obj_request_exists_test(obj_request))) { + + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + + rbd_dev = obj_request->img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + + return rbd_obj_request_submit(osdc, obj_request); + } + + /* + * It's a layered write. The target object might exist but + * we may not know that yet. If we know it doesn't exist, + * start by reading the data for the full target object from + * the parent so we can use it for a copyup to the target. + */ + if (known) + return rbd_img_obj_parent_read_full(obj_request); + + /* We don't know whether the target exists. Go find out. */ + + return rbd_img_obj_exists_submit(obj_request); } static int rbd_img_request_submit(struct rbd_img_request *img_request) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; + struct rbd_obj_request *next_obj_request; dout("%s: img %p\n", __func__, img_request); - for_each_obj_request(img_request, obj_request) { + for_each_obj_request_safe(img_request, obj_request, next_obj_request) { int ret; - obj_request->callback = rbd_img_obj_callback; - ret = rbd_obj_request_submit(osdc, obj_request); + ret = rbd_img_obj_request_submit(obj_request); if (ret) return ret; - /* - * The image request has its own reference to each - * of its object requests, so we can safely drop the - * initial one here. - */ - rbd_obj_request_put(obj_request); } return 0; } +static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *obj_request; + + rbd_assert(img_request_child_test(img_request)); + + obj_request = img_request->obj_request; + rbd_assert(obj_request != NULL); + obj_request->result = img_request->result; + obj_request->xferred = img_request->xferred; + + rbd_img_obj_request_read_callback(obj_request); + rbd_obj_request_complete(obj_request); +} + +static void rbd_img_parent_read(struct rbd_obj_request *obj_request) +{ + struct rbd_device *rbd_dev; + struct rbd_img_request *img_request; + int result; + + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request->img_request != NULL); + rbd_assert(obj_request->result == (s32) -ENOENT); + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + + rbd_dev = obj_request->img_request->rbd_dev; + rbd_assert(rbd_dev->parent != NULL); + /* rbd_read_finish(obj_request, obj_request->length); */ + img_request = rbd_img_request_create(rbd_dev->parent, + obj_request->img_offset, + obj_request->length, + false, true); + result = -ENOMEM; + if (!img_request) + goto out_err; + + rbd_obj_request_get(obj_request); + img_request->obj_request = obj_request; + + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, + obj_request->bio_list); + if (result) + goto out_err; + + img_request->callback = rbd_img_parent_read_callback; + result = rbd_img_request_submit(img_request); + if (result) + goto out_err; + + return; +out_err: + if (img_request) + rbd_img_request_put(img_request); + obj_request->result = result; + obj_request->xferred = 0; + obj_request_done_set(obj_request); +} + static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 ver, u64 notify_id) { struct rbd_obj_request *obj_request; - struct ceph_osd_req_op *op; - struct ceph_osd_client *osdc; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret; obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, @@ -1763,17 +2455,15 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, return -ENOMEM; ret = -ENOMEM; - op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver); - if (!op) - goto out; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, - obj_request, op); - rbd_osd_req_op_destroy(op); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); if (!obj_request->osd_req) goto out; - - osdc = &rbd_dev->rbd_client->client->osdc; obj_request->callback = rbd_obj_request_put; + + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK, + notify_id, ver, 0); + rbd_osd_req_format_read(obj_request); + ret = rbd_obj_request_submit(osdc, obj_request); out: if (ret) @@ -1810,7 +2500,6 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_req_op *op; int ret; rbd_assert(start ^ !!rbd_dev->watch_event); @@ -1830,14 +2519,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) if (!obj_request) goto out_cancel; - op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH, - rbd_dev->watch_event->cookie, - rbd_dev->header.obj_version, start); - if (!op) - goto out_cancel; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, - obj_request, op); - rbd_osd_req_op_destroy(op); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request); if (!obj_request->osd_req) goto out_cancel; @@ -1846,6 +2528,12 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) else ceph_osdc_unregister_linger_request(osdc, rbd_dev->watch_request->osd_req); + + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, + rbd_dev->watch_event->cookie, + rbd_dev->header.obj_version, start); + rbd_osd_req_format_write(obj_request); + ret = rbd_obj_request_submit(osdc, obj_request); if (ret) goto out_cancel; @@ -1897,20 +2585,18 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, size_t inbound_size, u64 *version) { + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_client *osdc; - struct ceph_osd_req_op *op; struct page **pages; u32 page_count; int ret; /* - * Method calls are ultimately read operations but they - * don't involve object data (so no offset or length). - * The result should placed into the inbound buffer - * provided. They also supply outbound data--parameters for - * the object method. Currently if this is present it will - * be a snapshot id. + * Method calls are ultimately read operations. The result + * should placed into the inbound buffer provided. They + * also supply outbound data--parameters for the object + * method. Currently if this is present it will be a + * snapshot id. */ page_count = (u32) calc_pages_for(0, inbound_size); pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); @@ -1918,7 +2604,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, return PTR_ERR(pages); ret = -ENOMEM; - obj_request = rbd_obj_request_create(object_name, 0, 0, + obj_request = rbd_obj_request_create(object_name, 0, inbound_size, OBJ_REQUEST_PAGES); if (!obj_request) goto out; @@ -1926,17 +2612,29 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, obj_request->pages = pages; obj_request->page_count = page_count; - op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name, - method_name, outbound, outbound_size); - if (!op) - goto out; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, - obj_request, op); - rbd_osd_req_op_destroy(op); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); if (!obj_request->osd_req) goto out; - osdc = &rbd_dev->rbd_client->client->osdc; + osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, + class_name, method_name); + if (outbound_size) { + struct ceph_pagelist *pagelist; + + pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); + if (!pagelist) + goto out; + + ceph_pagelist_init(pagelist); + ceph_pagelist_append(pagelist, outbound, outbound_size); + osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0, + pagelist); + } + osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, + obj_request->pages, inbound_size, + 0, false, false); + rbd_osd_req_format_read(obj_request); + ret = rbd_obj_request_submit(osdc, obj_request); if (ret) goto out; @@ -2025,13 +2723,14 @@ static void rbd_request_fn(struct request_queue *q) result = -ENOMEM; img_request = rbd_img_request_create(rbd_dev, offset, length, - write_request); + write_request, false); if (!img_request) goto end_request; img_request->rq = rq; - result = rbd_img_request_fill_bio(img_request, rq->bio); + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, + rq->bio); if (!result) result = rbd_img_request_submit(img_request); if (result) @@ -2039,8 +2738,10 @@ static void rbd_request_fn(struct request_queue *q) end_request: spin_lock_irq(q->queue_lock); if (result < 0) { - rbd_warn(rbd_dev, "obj_request %s result %d\n", - write_request ? "write" : "read", result); + rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", + write_request ? "write" : "read", + length, offset, result); + __blk_end_request_all(rq, result); } } @@ -2112,9 +2813,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, char *buf, u64 *version) { - struct ceph_osd_req_op *op; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_client *osdc; struct page **pages = NULL; u32 page_count; size_t size; @@ -2134,16 +2834,19 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, obj_request->pages = pages; obj_request->page_count = page_count; - op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length); - if (!op) - goto out; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, - obj_request, op); - rbd_osd_req_op_destroy(op); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); if (!obj_request->osd_req) goto out; - osdc = &rbd_dev->rbd_client->client->osdc; + osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, + offset, length, 0, 0); + osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, + obj_request->pages, + obj_request->length, + obj_request->offset & ~PAGE_MASK, + false, false); + rbd_osd_req_format_read(obj_request); + ret = rbd_obj_request_submit(osdc, obj_request); if (ret) goto out; @@ -2337,6 +3040,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) else ret = rbd_dev_v2_refresh(rbd_dev, hver); mutex_unlock(&ctl_mutex); + revalidate_disk(rbd_dev->disk); return ret; } @@ -2680,8 +3384,6 @@ static struct rbd_spec *rbd_spec_alloc(void) return NULL; kref_init(&spec->kref); - rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */ - return spec; } @@ -2913,7 +3615,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, return ret; incompat = le64_to_cpu(features_buf.incompat); - if (incompat & ~RBD_FEATURES_ALL) + if (incompat & ~RBD_FEATURES_SUPPORTED) return -ENXIO; *snap_features = le64_to_cpu(features_buf.features); @@ -3776,6 +4478,11 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) void *response; void *p; + /* If we already have it we don't need to look it up */ + + if (rbd_dev->spec->image_id) + return 0; + /* * When probing a parent image, the image id is already * known (and the image name likely is not). There's no @@ -3953,6 +4660,9 @@ out_err: static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) { + struct rbd_device *parent = NULL; + struct rbd_spec *parent_spec = NULL; + struct rbd_client *rbdc = NULL; int ret; /* no need to lock here, as rbd_dev is not registered yet */ @@ -3997,6 +4707,31 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) * At this point cleanup in the event of an error is the job * of the sysfs code (initiated by rbd_bus_del_dev()). */ + /* Probe the parent if there is one */ + + if (rbd_dev->parent_spec) { + /* + * We need to pass a reference to the client and the + * parent spec when creating the parent rbd_dev. + * Images related by parent/child relationships + * always share both. + */ + parent_spec = rbd_spec_get(rbd_dev->parent_spec); + rbdc = __rbd_get_client(rbd_dev->rbd_client); + + parent = rbd_dev_create(rbdc, parent_spec); + if (!parent) { + ret = -ENOMEM; + goto err_out_spec; + } + rbdc = NULL; /* parent now owns reference */ + parent_spec = NULL; /* parent now owns reference */ + ret = rbd_dev_probe(parent); + if (ret < 0) + goto err_out_parent; + rbd_dev->parent = parent; + } + down_write(&rbd_dev->header_rwsem); ret = rbd_dev_snaps_register(rbd_dev); up_write(&rbd_dev->header_rwsem); @@ -4015,6 +4750,12 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) (unsigned long long) rbd_dev->mapping.size); return ret; + +err_out_parent: + rbd_dev_destroy(parent); +err_out_spec: + rbd_spec_put(parent_spec); + rbd_put_client(rbdc); err_out_bus: /* this will also clean up rest of rbd_dev stuff */ @@ -4178,6 +4919,12 @@ static void rbd_dev_release(struct device *dev) module_put(THIS_MODULE); } +static void __rbd_remove(struct rbd_device *rbd_dev) +{ + rbd_remove_all_snaps(rbd_dev); + rbd_bus_del_dev(rbd_dev); +} + static ssize_t rbd_remove(struct bus_type *bus, const char *buf, size_t count) @@ -4213,8 +4960,26 @@ static ssize_t rbd_remove(struct bus_type *bus, if (ret < 0) goto done; - rbd_remove_all_snaps(rbd_dev); - rbd_bus_del_dev(rbd_dev); + while (rbd_dev->parent_spec) { + struct rbd_device *first = rbd_dev; + struct rbd_device *second = first->parent; + struct rbd_device *third; + + /* + * Follow to the parent with no grandparent and + * remove it. + */ + while (second && (third = second->parent)) { + first = second; + second = third; + } + __rbd_remove(second); + rbd_spec_put(first->parent_spec); + first->parent_spec = NULL; + first->parent_overlap = 0; + first->parent = NULL; + } + __rbd_remove(rbd_dev); done: mutex_unlock(&ctl_mutex);