enum obj_req_flags {
OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
+ OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
+ OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
};
struct rbd_obj_request {
u64 length; /* bytes from offset */
unsigned long flags;
- struct rbd_img_request *img_request;
- u64 img_offset; /* image relative offset */
- struct list_head links; /* img_request->obj_requests */
+ /*
+ * An object request associated with an image will have its
+ * img_data flag set; a standalone object request will not.
+ *
+ * A standalone object request will have which == BAD_WHICH
+ * and a null obj_request pointer.
+ *
+ * An object request initiated in support of a layered image
+ * object (to check for its existence before a write) will
+ * have which == BAD_WHICH and a non-null obj_request pointer.
+ *
+ * Finally, an object request for rbd image data will have
+ * which != BAD_WHICH, and will have a non-null img_request
+ * pointer. The value of which will be in the range
+ * 0..(img_request->obj_request_count-1).
+ */
+ union {
+ struct rbd_obj_request *obj_request; /* STAT op */
+ struct {
+ struct rbd_img_request *img_request;
+ u64 img_offset;
+ /* links for img_request->obj_requests list */
+ struct list_head links;
+ };
+ };
u32 which; /* posn image request list */
enum obj_request_type type;
u32 page_count;
};
};
+ struct page **copyup_pages;
struct ceph_osd_request *osd_req;
struct request *rq; /* block request */
struct rbd_obj_request *obj_request; /* obj req initiator */
};
+ struct page **copyup_pages;
spinlock_t completion_lock;/* protects next_completion */
u32 next_completion;
rbd_img_callback_t callback;
struct rbd_spec *parent_spec;
u64 parent_overlap;
+ struct rbd_device *parent;
/* protects updating the header */
struct rw_semaphore header_rwsem;
static LIST_HEAD(rbd_client_list); /* clients */
static DEFINE_SPINLOCK(rbd_client_list_lock);
+static int rbd_img_request_submit(struct rbd_img_request *img_request);
+
static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
size_t count);
static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
size_t count);
+static int rbd_dev_probe(struct rbd_device *rbd_dev);
static struct bus_attribute rbd_bus_attrs[] = {
__ATTR(add, S_IWUSR, NULL, rbd_add),
# define rbd_assert(expr) ((void) 0)
#endif /* !RBD_DEBUG */
+static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
+static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
+
static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
return ERR_PTR(ret);
}
+static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
+{
+ kref_get(&rbdc->kref);
+
+ return rbdc;
+}
+
/*
* Find a ceph client with specific addr and configuration. If
* found, bump its reference count.
spin_lock(&rbd_client_list_lock);
list_for_each_entry(client_node, &rbd_client_list, node) {
if (!ceph_compare_options(ceph_opts, client_node->client)) {
- kref_get(&client_node->kref);
+ __rbd_get_client(client_node);
+
found = true;
break;
}
}
}
+/*
+ * similar to zero_bio_chain(), zeros data defined by a page array,
+ * starting at the given byte offset from the start of the array and
+ * continuing up to the given end offset. The pages array is
+ * assumed to be big enough to hold all bytes up to the end.
+ */
+static void zero_pages(struct page **pages, u64 offset, u64 end)
+{
+ struct page **page = &pages[offset >> PAGE_SHIFT];
+
+ rbd_assert(end > offset);
+ rbd_assert(end - offset <= (u64)SIZE_MAX);
+ while (offset < end) {
+ size_t page_offset;
+ size_t length;
+ unsigned long flags;
+ void *kaddr;
+
+ page_offset = (size_t)(offset & ~PAGE_MASK);
+ length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
+ local_irq_save(flags);
+ kaddr = kmap_atomic(*page);
+ memset(kaddr + page_offset, 0, length);
+ kunmap_atomic(kaddr);
+ local_irq_restore(flags);
+
+ offset += length;
+ page++;
+ }
+}
+
/*
* Clone a portion of a bio, starting at the given byte offset
* and continuing for the number of bytes indicated.
* each flag, once its value is set to 1 it is never reset to 0
* again.
*/
+static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
+{
+ if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
+ struct rbd_device *rbd_dev;
+
+ rbd_dev = obj_request->img_request->rbd_dev;
+ rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
+ obj_request);
+ }
+}
+
+static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
+{
+ smp_mb();
+ return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
+}
+
static void obj_request_done_set(struct rbd_obj_request *obj_request)
{
if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
- struct rbd_img_request *img_request = obj_request->img_request;
- struct rbd_device *rbd_dev;
+ struct rbd_device *rbd_dev = NULL;
- rbd_dev = img_request ? img_request->rbd_dev : NULL;
+ if (obj_request_img_data_test(obj_request))
+ rbd_dev = obj_request->img_request->rbd_dev;
rbd_warn(rbd_dev, "obj_request %p already marked done\n",
obj_request);
}
return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
}
-static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
+/*
+ * This sets the KNOWN flag after (possibly) setting the EXISTS
+ * flag. The latter is set based on the "exists" value provided.
+ *
+ * Note that for our purposes once an object exists it never goes
+ * away again. It's possible that the response from two existence
+ * checks are separated by the creation of the target object, and
+ * the first ("doesn't exist") response arrives *after* the second
+ * ("does exist"). In that case we ignore the second one.
+ */
+static void obj_request_existence_set(struct rbd_obj_request *obj_request,
+ bool exists)
{
- if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
- struct rbd_img_request *img_request = obj_request->img_request;
- struct rbd_device *rbd_dev;
+ if (exists)
+ set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
+ set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
+ smp_mb();
+}
- rbd_dev = img_request ? img_request->rbd_dev : NULL;
- rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
- obj_request);
- }
+static bool obj_request_known_test(struct rbd_obj_request *obj_request)
+{
+ smp_mb();
+ return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
}
-static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
+static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
{
smp_mb();
- return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
+ return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
}
static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
{
rbd_assert(obj_request->img_request == NULL);
- rbd_obj_request_get(obj_request);
+ /* Image request now owns object's original reference */
obj_request->img_request = img_request;
obj_request->which = img_request->obj_request_count;
rbd_assert(!obj_request_img_data_test(obj_request));
static void
rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
{
+ u64 xferred = obj_request->xferred;
+ u64 length = obj_request->length;
+
dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
obj_request, obj_request->img_request, obj_request->result,
- obj_request->xferred, obj_request->length);
+ xferred, length);
/*
* ENOENT means a hole in the image. We zero-fill the
* entire length of the request. A short read also implies
* update the xferred count to indicate the whole request
* was satisfied.
*/
- BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
+ rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
if (obj_request->result == -ENOENT) {
- zero_bio_chain(obj_request->bio_list, 0);
+ if (obj_request->type == OBJ_REQUEST_BIO)
+ zero_bio_chain(obj_request->bio_list, 0);
+ else
+ zero_pages(obj_request->pages, 0, length);
obj_request->result = 0;
- obj_request->xferred = obj_request->length;
- } else if (obj_request->xferred < obj_request->length &&
- !obj_request->result) {
- zero_bio_chain(obj_request->bio_list, obj_request->xferred);
- obj_request->xferred = obj_request->length;
+ obj_request->xferred = length;
+ } else if (xferred < length && !obj_request->result) {
+ if (obj_request->type == OBJ_REQUEST_BIO)
+ zero_bio_chain(obj_request->bio_list, xferred);
+ else
+ zero_pages(obj_request->pages, xferred, length);
+ obj_request->xferred = length;
}
obj_request_done_set(obj_request);
}
static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
{
- dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
- obj_request->result, obj_request->xferred, obj_request->length);
- if (obj_request->img_request)
+ struct rbd_img_request *img_request = NULL;
+ bool layered = false;
+
+ if (obj_request_img_data_test(obj_request)) {
+ img_request = obj_request->img_request;
+ layered = img_request && img_request_layered_test(img_request);
+ } else {
+ img_request = NULL;
+ layered = false;
+ }
+
+ dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
+ obj_request, img_request, obj_request->result,
+ obj_request->xferred, obj_request->length);
+ if (layered && obj_request->result == -ENOENT)
+ rbd_img_parent_read(obj_request);
+ else if (img_request)
rbd_img_obj_request_read_callback(obj_request);
else
obj_request_done_set(obj_request);
dout("%s: obj %p result %d %llu\n", __func__, obj_request,
obj_request->result, obj_request->length);
/*
- * There is no such thing as a successful short write.
- * Our xferred value is the number of bytes transferred
- * back. Set it to our originally-requested length.
+ * There is no such thing as a successful short write. Set
+ * it to our originally-requested length.
*/
obj_request->xferred = obj_request->length;
obj_request_done_set(obj_request);
dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
rbd_assert(osd_req == obj_request->osd_req);
- rbd_assert(obj_request_img_data_test(obj_request) ^
- !obj_request->img_request);
- rbd_assert(obj_request_img_data_test(obj_request) ^
- (obj_request->which == BAD_WHICH));
+ if (obj_request_img_data_test(obj_request)) {
+ rbd_assert(obj_request->img_request);
+ rbd_assert(obj_request->which != BAD_WHICH);
+ } else {
+ rbd_assert(obj_request->which == BAD_WHICH);
+ }
if (osd_req->r_result < 0)
obj_request->result = osd_req->r_result;
obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
- WARN_ON(osd_req->r_num_ops != 1); /* For now */
+ BUG_ON(osd_req->r_num_ops > 2);
/*
* We support a 64-bit length, but ultimately it has to be
* passed to blk_end_request(), which takes an unsigned int.
*/
obj_request->xferred = osd_req->r_reply_op_len[0];
- rbd_assert(obj_request->xferred < (u64) UINT_MAX);
+ rbd_assert(obj_request->xferred < (u64)UINT_MAX);
opcode = osd_req->r_ops[0].op;
switch (opcode) {
case CEPH_OSD_OP_READ:
rbd_obj_request_complete(obj_request);
}
-static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
- bool write_request)
+static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = obj_request->img_request;
struct ceph_osd_request *osd_req = obj_request->osd_req;
- struct ceph_snap_context *snapc = NULL;
- u64 snap_id = CEPH_NOSNAP;
- struct timespec *mtime = NULL;
- struct timespec now;
+ u64 snap_id;
rbd_assert(osd_req != NULL);
- if (write_request) {
- now = CURRENT_TIME;
- mtime = &now;
- if (img_request)
- snapc = img_request->snapc;
- } else if (img_request) {
- snap_id = img_request->snap_id;
- }
+ snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
+ ceph_osdc_build_request(osd_req, obj_request->offset,
+ NULL, snap_id, NULL);
+}
+
+static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
+{
+ struct rbd_img_request *img_request = obj_request->img_request;
+ struct ceph_osd_request *osd_req = obj_request->osd_req;
+ struct ceph_snap_context *snapc;
+ struct timespec mtime = CURRENT_TIME;
+
+ rbd_assert(osd_req != NULL);
+
+ snapc = img_request ? img_request->snapc : NULL;
ceph_osdc_build_request(osd_req, obj_request->offset,
- snapc, snap_id, mtime);
+ snapc, CEPH_NOSNAP, &mtime);
}
static struct ceph_osd_request *rbd_osd_req_create(
return osd_req;
}
+/*
+ * Create a copyup osd request based on the information in the
+ * object request supplied. A copyup request has two osd ops,
+ * a copyup method call, and a "normal" write request.
+ */
+static struct ceph_osd_request *
+rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
+{
+ struct rbd_img_request *img_request;
+ struct ceph_snap_context *snapc;
+ struct rbd_device *rbd_dev;
+ struct ceph_osd_client *osdc;
+ struct ceph_osd_request *osd_req;
+
+ rbd_assert(obj_request_img_data_test(obj_request));
+ img_request = obj_request->img_request;
+ rbd_assert(img_request);
+ rbd_assert(img_request_write_test(img_request));
+
+ /* Allocate and initialize the request, for the two ops */
+
+ snapc = img_request->snapc;
+ rbd_dev = img_request->rbd_dev;
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
+ if (!osd_req)
+ return NULL; /* ENOMEM */
+
+ osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+ osd_req->r_callback = rbd_osd_req_callback;
+ osd_req->r_priv = obj_request;
+
+ osd_req->r_oid_len = strlen(obj_request->object_name);
+ rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
+ memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
+
+ osd_req->r_file_layout = rbd_dev->layout; /* struct */
+
+ return osd_req;
+}
+
+
static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
{
ceph_osdc_put_request(osd_req);
INIT_LIST_HEAD(&img_request->obj_requests);
kref_init(&img_request->kref);
- (void) img_request_layered_test(img_request); /* Avoid a warning */
rbd_img_request_get(img_request); /* Avoid a warning */
rbd_img_request_put(img_request); /* TEMPORARY */
if (img_request_write_test(img_request))
ceph_put_snap_context(img_request->snapc);
+ if (img_request_child_test(img_request))
+ rbd_obj_request_put(img_request->obj_request);
+
kfree(img_request);
}
struct rbd_img_request *img_request;
unsigned int xferred;
int result;
+ bool more;
rbd_assert(obj_request_img_data_test(obj_request));
img_request = obj_request->img_request;
- rbd_assert(!img_request_child_test(img_request));
- rbd_assert(img_request->rq != NULL);
-
rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
xferred = (unsigned int)obj_request->xferred;
result = obj_request->result;
img_request->result = result;
}
- return blk_end_request(img_request->rq, result, xferred);
+ /* Image object requests don't own their page array */
+
+ if (obj_request->type == OBJ_REQUEST_PAGES) {
+ obj_request->pages = NULL;
+ obj_request->page_count = 0;
+ }
+
+ if (img_request_child_test(img_request)) {
+ rbd_assert(img_request->obj_request != NULL);
+ more = obj_request->which < img_request->obj_request_count - 1;
+ } else {
+ rbd_assert(img_request->rq != NULL);
+ more = blk_end_request(img_request->rq, result, xferred);
+ }
+
+ return more;
}
static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
rbd_img_request_complete(img_request);
}
-static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
- struct bio *bio_list)
+/*
+ * Split up an image request into one or more object requests, each
+ * to a different object. The "type" parameter indicates whether
+ * "data_desc" is the pointer to the head of a list of bio
+ * structures, or the base of a page array. In either case this
+ * function assumes data_desc describes memory sufficient to hold
+ * all data described by the image request.
+ */
+static int rbd_img_request_fill(struct rbd_img_request *img_request,
+ enum obj_request_type type,
+ void *data_desc)
{
struct rbd_device *rbd_dev = img_request->rbd_dev;
struct rbd_obj_request *obj_request = NULL;
struct rbd_obj_request *next_obj_request;
bool write_request = img_request_write_test(img_request);
- unsigned int bio_offset;
+ struct bio *bio_list;
+ unsigned int bio_offset = 0;
+ struct page **pages;
u64 img_offset;
u64 resid;
u16 opcode;
- dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
+ dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
+ (int)type, data_desc);
opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
- bio_offset = 0;
img_offset = img_request->offset;
- rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
resid = img_request->length;
rbd_assert(resid > 0);
+
+ if (type == OBJ_REQUEST_BIO) {
+ bio_list = data_desc;
+ rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
+ } else {
+ rbd_assert(type == OBJ_REQUEST_PAGES);
+ pages = data_desc;
+ }
+
while (resid) {
struct ceph_osd_request *osd_req;
const char *object_name;
- unsigned int clone_size;
u64 offset;
u64 length;
offset = rbd_segment_offset(rbd_dev, img_offset);
length = rbd_segment_length(rbd_dev, img_offset, resid);
obj_request = rbd_obj_request_create(object_name,
- offset, length,
- OBJ_REQUEST_BIO);
+ offset, length, type);
kfree(object_name); /* object request has its own copy */
if (!obj_request)
goto out_unwind;
- rbd_assert(length <= (u64) UINT_MAX);
- clone_size = (unsigned int) length;
- obj_request->bio_list = bio_chain_clone_range(&bio_list,
- &bio_offset, clone_size,
- GFP_ATOMIC);
- if (!obj_request->bio_list)
- goto out_partial;
+ if (type == OBJ_REQUEST_BIO) {
+ unsigned int clone_size;
+
+ rbd_assert(length <= (u64)UINT_MAX);
+ clone_size = (unsigned int)length;
+ obj_request->bio_list =
+ bio_chain_clone_range(&bio_list,
+ &bio_offset,
+ clone_size,
+ GFP_ATOMIC);
+ if (!obj_request->bio_list)
+ goto out_partial;
+ } else {
+ unsigned int page_count;
+
+ obj_request->pages = pages;
+ page_count = (u32)calc_pages_for(offset, length);
+ obj_request->page_count = page_count;
+ if ((offset + length) & ~PAGE_MASK)
+ page_count--; /* more on last page */
+ pages += page_count;
+ }
osd_req = rbd_osd_req_create(rbd_dev, write_request,
obj_request);
osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
0, 0);
- osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
- obj_request->bio_list, obj_request->length);
- rbd_osd_req_format(obj_request, write_request);
+ if (type == OBJ_REQUEST_BIO)
+ osd_req_op_extent_osd_data_bio(osd_req, 0,
+ obj_request->bio_list, length);
+ else
+ osd_req_op_extent_osd_data_pages(osd_req, 0,
+ obj_request->pages, length,
+ offset & ~PAGE_MASK, false, false);
+
+ if (write_request)
+ rbd_osd_req_format_write(obj_request);
+ else
+ rbd_osd_req_format_read(obj_request);
obj_request->img_offset = img_offset;
rbd_img_obj_request_add(img_request, obj_request);
return -ENOMEM;
}
+static void
+rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
+{
+ struct rbd_img_request *img_request;
+ struct rbd_device *rbd_dev;
+ u64 length;
+ u32 page_count;
+
+ rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+ rbd_assert(obj_request_img_data_test(obj_request));
+ img_request = obj_request->img_request;
+ rbd_assert(img_request);
+
+ rbd_dev = img_request->rbd_dev;
+ rbd_assert(rbd_dev);
+ length = (u64)1 << rbd_dev->header.obj_order;
+ page_count = (u32)calc_pages_for(0, length);
+
+ rbd_assert(obj_request->copyup_pages);
+ ceph_release_page_vector(obj_request->copyup_pages, page_count);
+ obj_request->copyup_pages = NULL;
+
+ /*
+ * We want the transfer count to reflect the size of the
+ * original write request. There is no such thing as a
+ * successful short write, so if the request was successful
+ * we can just set it to the originally-requested length.
+ */
+ if (!obj_request->result)
+ obj_request->xferred = obj_request->length;
+
+ /* Finish up with the normal image object callback */
+
+ rbd_img_obj_callback(obj_request);
+}
+
+static void
+rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
+{
+ struct rbd_obj_request *orig_request;
+ struct ceph_osd_request *osd_req;
+ struct ceph_osd_client *osdc;
+ struct rbd_device *rbd_dev;
+ struct page **pages;
+ int result;
+ u64 obj_size;
+ u64 xferred;
+
+ rbd_assert(img_request_child_test(img_request));
+
+ /* First get what we need from the image request */
+
+ pages = img_request->copyup_pages;
+ rbd_assert(pages != NULL);
+ img_request->copyup_pages = NULL;
+
+ orig_request = img_request->obj_request;
+ rbd_assert(orig_request != NULL);
+ rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
+ result = img_request->result;
+ obj_size = img_request->length;
+ xferred = img_request->xferred;
+
+ rbd_dev = img_request->rbd_dev;
+ rbd_assert(rbd_dev);
+ rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
+
+ rbd_img_request_put(img_request);
+
+ if (result)
+ goto out_err;
+
+ /* Allocate the new copyup osd request for the original request */
+
+ result = -ENOMEM;
+ rbd_assert(!orig_request->osd_req);
+ osd_req = rbd_osd_req_create_copyup(orig_request);
+ if (!osd_req)
+ goto out_err;
+ orig_request->osd_req = osd_req;
+ orig_request->copyup_pages = pages;
+
+ /* Initialize the copyup op */
+
+ osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
+ osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+ false, false);
+
+ /* Then the original write request op */
+
+ osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
+ orig_request->offset,
+ orig_request->length, 0, 0);
+ osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
+ orig_request->length);
+
+ rbd_osd_req_format_write(orig_request);
+
+ /* All set, send it off. */
+
+ orig_request->callback = rbd_img_obj_copyup_callback;
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ result = rbd_obj_request_submit(osdc, orig_request);
+ if (!result)
+ return;
+out_err:
+ /* Record the error code and complete the request */
+
+ orig_request->result = result;
+ orig_request->xferred = 0;
+ obj_request_done_set(orig_request);
+ rbd_obj_request_complete(orig_request);
+}
+
+/*
+ * Read from the parent image the range of data that covers the
+ * entire target of the given object request. This is used for
+ * satisfying a layered image write request when the target of an
+ * object request from the image request does not exist.
+ *
+ * A page array big enough to hold the returned data is allocated
+ * and supplied to rbd_img_request_fill() as the "data descriptor."
+ * When the read completes, this page array will be transferred to
+ * the original object request for the copyup operation.
+ *
+ * If an error occurs, record it as the result of the original
+ * object request and mark it done so it gets completed.
+ */
+static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
+{
+ struct rbd_img_request *img_request = NULL;
+ struct rbd_img_request *parent_request = NULL;
+ struct rbd_device *rbd_dev;
+ u64 img_offset;
+ u64 length;
+ struct page **pages = NULL;
+ u32 page_count;
+ int result;
+
+ rbd_assert(obj_request_img_data_test(obj_request));
+ rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+
+ img_request = obj_request->img_request;
+ rbd_assert(img_request != NULL);
+ rbd_dev = img_request->rbd_dev;
+ rbd_assert(rbd_dev->parent != NULL);
+
+ /*
+ * First things first. The original osd request is of no
+ * use to use any more, we'll need a new one that can hold
+ * the two ops in a copyup request. We'll get that later,
+ * but for now we can release the old one.
+ */
+ rbd_osd_req_destroy(obj_request->osd_req);
+ obj_request->osd_req = NULL;
+
+ /*
+ * Determine the byte range covered by the object in the
+ * child image to which the original request was to be sent.
+ */
+ img_offset = obj_request->img_offset - obj_request->offset;
+ length = (u64)1 << rbd_dev->header.obj_order;
+
+ /*
+ * Allocate a page array big enough to receive the data read
+ * from the parent.
+ */
+ page_count = (u32)calc_pages_for(0, length);
+ pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+ if (IS_ERR(pages)) {
+ result = PTR_ERR(pages);
+ pages = NULL;
+ goto out_err;
+ }
+
+ result = -ENOMEM;
+ parent_request = rbd_img_request_create(rbd_dev->parent,
+ img_offset, length,
+ false, true);
+ if (!parent_request)
+ goto out_err;
+ rbd_obj_request_get(obj_request);
+ parent_request->obj_request = obj_request;
+
+ result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
+ if (result)
+ goto out_err;
+ parent_request->copyup_pages = pages;
+
+ parent_request->callback = rbd_img_obj_parent_read_full_callback;
+ result = rbd_img_request_submit(parent_request);
+ if (!result)
+ return 0;
+
+ parent_request->copyup_pages = NULL;
+ parent_request->obj_request = NULL;
+ rbd_obj_request_put(obj_request);
+out_err:
+ if (pages)
+ ceph_release_page_vector(pages, page_count);
+ if (parent_request)
+ rbd_img_request_put(parent_request);
+ obj_request->result = result;
+ obj_request->xferred = 0;
+ obj_request_done_set(obj_request);
+
+ return result;
+}
+
+static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
+{
+ struct rbd_obj_request *orig_request;
+ int result;
+
+ rbd_assert(!obj_request_img_data_test(obj_request));
+
+ /*
+ * All we need from the object request is the original
+ * request and the result of the STAT op. Grab those, then
+ * we're done with the request.
+ */
+ orig_request = obj_request->obj_request;
+ obj_request->obj_request = NULL;
+ rbd_assert(orig_request);
+ rbd_assert(orig_request->img_request);
+
+ result = obj_request->result;
+ obj_request->result = 0;
+
+ dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
+ obj_request, orig_request, result,
+ obj_request->xferred, obj_request->length);
+ rbd_obj_request_put(obj_request);
+
+ rbd_assert(orig_request);
+ rbd_assert(orig_request->img_request);
+
+ /*
+ * Our only purpose here is to determine whether the object
+ * exists, and we don't want to treat the non-existence as
+ * an error. If something else comes back, transfer the
+ * error to the original request and complete it now.
+ */
+ if (!result) {
+ obj_request_existence_set(orig_request, true);
+ } else if (result == -ENOENT) {
+ obj_request_existence_set(orig_request, false);
+ } else if (result) {
+ orig_request->result = result;
+ goto out;
+ }
+
+ /*
+ * Resubmit the original request now that we have recorded
+ * whether the target object exists.
+ */
+ orig_request->result = rbd_img_obj_request_submit(orig_request);
+out:
+ if (orig_request->result)
+ rbd_obj_request_complete(orig_request);
+ rbd_obj_request_put(orig_request);
+}
+
+static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
+{
+ struct rbd_obj_request *stat_request;
+ struct rbd_device *rbd_dev;
+ struct ceph_osd_client *osdc;
+ struct page **pages = NULL;
+ u32 page_count;
+ size_t size;
+ int ret;
+
+ /*
+ * The response data for a STAT call consists of:
+ * le64 length;
+ * struct {
+ * le32 tv_sec;
+ * le32 tv_nsec;
+ * } mtime;
+ */
+ size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
+ page_count = (u32)calc_pages_for(0, size);
+ pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ ret = -ENOMEM;
+ stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
+ OBJ_REQUEST_PAGES);
+ if (!stat_request)
+ goto out;
+
+ rbd_obj_request_get(obj_request);
+ stat_request->obj_request = obj_request;
+ stat_request->pages = pages;
+ stat_request->page_count = page_count;
+
+ rbd_assert(obj_request->img_request);
+ rbd_dev = obj_request->img_request->rbd_dev;
+ stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+ stat_request);
+ if (!stat_request->osd_req)
+ goto out;
+ stat_request->callback = rbd_img_obj_exists_callback;
+
+ osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
+ osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
+ false, false);
+ rbd_osd_req_format_read(stat_request);
+
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ ret = rbd_obj_request_submit(osdc, stat_request);
+out:
+ if (ret)
+ rbd_obj_request_put(obj_request);
+
+ return ret;
+}
+
+static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
+{
+ struct rbd_img_request *img_request;
+ bool known;
+
+ rbd_assert(obj_request_img_data_test(obj_request));
+
+ img_request = obj_request->img_request;
+ rbd_assert(img_request);
+
+ /*
+ * Only layered writes need special handling. If it's not a
+ * layered write, or it is a layered write but we know the
+ * target object exists, it's no different from any other
+ * object request.
+ */
+ if (!img_request_write_test(img_request) ||
+ !img_request_layered_test(img_request) ||
+ ((known = obj_request_known_test(obj_request)) &&
+ obj_request_exists_test(obj_request))) {
+
+ struct rbd_device *rbd_dev;
+ struct ceph_osd_client *osdc;
+
+ rbd_dev = obj_request->img_request->rbd_dev;
+ osdc = &rbd_dev->rbd_client->client->osdc;
+
+ return rbd_obj_request_submit(osdc, obj_request);
+ }
+
+ /*
+ * It's a layered write. The target object might exist but
+ * we may not know that yet. If we know it doesn't exist,
+ * start by reading the data for the full target object from
+ * the parent so we can use it for a copyup to the target.
+ */
+ if (known)
+ return rbd_img_obj_parent_read_full(obj_request);
+
+ /* We don't know whether the target exists. Go find out. */
+
+ return rbd_img_obj_exists_submit(obj_request);
+}
+
static int rbd_img_request_submit(struct rbd_img_request *img_request)
{
- struct rbd_device *rbd_dev = img_request->rbd_dev;
- struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
struct rbd_obj_request *next_obj_request;
for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
int ret;
- ret = rbd_obj_request_submit(osdc, obj_request);
+ ret = rbd_img_obj_request_submit(obj_request);
if (ret)
return ret;
- /*
- * The image request has its own reference to each
- * of its object requests, so we can safely drop the
- * initial one here.
- */
- rbd_obj_request_put(obj_request);
}
return 0;
}
+static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
+{
+ struct rbd_obj_request *obj_request;
+
+ rbd_assert(img_request_child_test(img_request));
+
+ obj_request = img_request->obj_request;
+ rbd_assert(obj_request != NULL);
+ obj_request->result = img_request->result;
+ obj_request->xferred = img_request->xferred;
+
+ rbd_img_obj_request_read_callback(obj_request);
+ rbd_obj_request_complete(obj_request);
+}
+
+static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
+{
+ struct rbd_device *rbd_dev;
+ struct rbd_img_request *img_request;
+ int result;
+
+ rbd_assert(obj_request_img_data_test(obj_request));
+ rbd_assert(obj_request->img_request != NULL);
+ rbd_assert(obj_request->result == (s32) -ENOENT);
+ rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+
+ rbd_dev = obj_request->img_request->rbd_dev;
+ rbd_assert(rbd_dev->parent != NULL);
+ /* rbd_read_finish(obj_request, obj_request->length); */
+ img_request = rbd_img_request_create(rbd_dev->parent,
+ obj_request->img_offset,
+ obj_request->length,
+ false, true);
+ result = -ENOMEM;
+ if (!img_request)
+ goto out_err;
+
+ rbd_obj_request_get(obj_request);
+ img_request->obj_request = obj_request;
+
+ result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+ obj_request->bio_list);
+ if (result)
+ goto out_err;
+
+ img_request->callback = rbd_img_parent_read_callback;
+ result = rbd_img_request_submit(img_request);
+ if (result)
+ goto out_err;
+
+ return;
+out_err:
+ if (img_request)
+ rbd_img_request_put(img_request);
+ obj_request->result = result;
+ obj_request->xferred = 0;
+ obj_request_done_set(obj_request);
+}
+
static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
u64 ver, u64 notify_id)
{
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
notify_id, ver, 0);
- rbd_osd_req_format(obj_request, false);
+ rbd_osd_req_format_read(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
out:
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
rbd_dev->watch_event->cookie,
rbd_dev->header.obj_version, start);
- rbd_osd_req_format(obj_request, true);
+ rbd_osd_req_format_write(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
obj_request->pages, inbound_size,
0, false, false);
- rbd_osd_req_format(obj_request, false);
+ rbd_osd_req_format_read(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
img_request->rq = rq;
- result = rbd_img_request_fill_bio(img_request, rq->bio);
+ result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+ rq->bio);
if (!result)
result = rbd_img_request_submit(img_request);
if (result)
osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
offset, length, 0, 0);
- osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
+ osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
obj_request->pages,
obj_request->length,
obj_request->offset & ~PAGE_MASK,
false, false);
- rbd_osd_req_format(obj_request, false);
+ rbd_osd_req_format_read(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
else
ret = rbd_dev_v2_refresh(rbd_dev, hver);
mutex_unlock(&ctl_mutex);
+ revalidate_disk(rbd_dev->disk);
return ret;
}
return NULL;
kref_init(&spec->kref);
- rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
-
return spec;
}
void *response;
void *p;
+ /* If we already have it we don't need to look it up */
+
+ if (rbd_dev->spec->image_id)
+ return 0;
+
/*
* When probing a parent image, the image id is already
* known (and the image name likely is not). There's no
static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
{
+ struct rbd_device *parent = NULL;
+ struct rbd_spec *parent_spec = NULL;
+ struct rbd_client *rbdc = NULL;
int ret;
/* no need to lock here, as rbd_dev is not registered yet */
* At this point cleanup in the event of an error is the job
* of the sysfs code (initiated by rbd_bus_del_dev()).
*/
+ /* Probe the parent if there is one */
+
+ if (rbd_dev->parent_spec) {
+ /*
+ * We need to pass a reference to the client and the
+ * parent spec when creating the parent rbd_dev.
+ * Images related by parent/child relationships
+ * always share both.
+ */
+ parent_spec = rbd_spec_get(rbd_dev->parent_spec);
+ rbdc = __rbd_get_client(rbd_dev->rbd_client);
+
+ parent = rbd_dev_create(rbdc, parent_spec);
+ if (!parent) {
+ ret = -ENOMEM;
+ goto err_out_spec;
+ }
+ rbdc = NULL; /* parent now owns reference */
+ parent_spec = NULL; /* parent now owns reference */
+ ret = rbd_dev_probe(parent);
+ if (ret < 0)
+ goto err_out_parent;
+ rbd_dev->parent = parent;
+ }
+
down_write(&rbd_dev->header_rwsem);
ret = rbd_dev_snaps_register(rbd_dev);
up_write(&rbd_dev->header_rwsem);
(unsigned long long) rbd_dev->mapping.size);
return ret;
+
+err_out_parent:
+ rbd_dev_destroy(parent);
+err_out_spec:
+ rbd_spec_put(parent_spec);
+ rbd_put_client(rbdc);
err_out_bus:
/* this will also clean up rest of rbd_dev stuff */
module_put(THIS_MODULE);
}
+static void __rbd_remove(struct rbd_device *rbd_dev)
+{
+ rbd_remove_all_snaps(rbd_dev);
+ rbd_bus_del_dev(rbd_dev);
+}
+
static ssize_t rbd_remove(struct bus_type *bus,
const char *buf,
size_t count)
if (ret < 0)
goto done;
- rbd_remove_all_snaps(rbd_dev);
- rbd_bus_del_dev(rbd_dev);
+ while (rbd_dev->parent_spec) {
+ struct rbd_device *first = rbd_dev;
+ struct rbd_device *second = first->parent;
+ struct rbd_device *third;
+
+ /*
+ * Follow to the parent with no grandparent and
+ * remove it.
+ */
+ while (second && (third = second->parent)) {
+ first = second;
+ second = third;
+ }
+ __rbd_remove(second);
+ rbd_spec_put(first->parent_spec);
+ first->parent_spec = NULL;
+ first->parent_overlap = 0;
+ first->parent = NULL;
+ }
+ __rbd_remove(rbd_dev);
done:
mutex_unlock(&ctl_mutex);