2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
60 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_SNAP_HEAD_NAME "-"
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX 64
72 #define RBD_OBJ_PREFIX_LEN_MAX 64
76 #define RBD_FEATURE_LAYERING (1<<0)
77 #define RBD_FEATURE_STRIPINGV2 (1<<1)
78 #define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81 /* Features supported by this (client software) implementation. */
83 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
91 #define DEV_NAME_LEN 32
92 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 * block device image metadata (in-memory version)
97 struct rbd_image_header {
98 /* These four fields never change for a given rbd image */
105 /* The remaining fields need to be updated occasionally */
107 struct ceph_snap_context *snapc;
118 * An rbd image specification.
120 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121 * identify an image. Each rbd_dev structure includes a pointer to
122 * an rbd_spec structure that encapsulates this identity.
124 * Each of the id's in an rbd_spec has an associated name. For a
125 * user-mapped image, the names are supplied and the id's associated
126 * with them are looked up. For a layered image, a parent image is
127 * defined by the tuple, and the names are looked up.
129 * An rbd_dev structure contains a parent_spec pointer which is
130 * non-null if the image it represents is a child in a layered
131 * image. This pointer will refer to the rbd_spec structure used
132 * by the parent rbd_dev for its own identity (i.e., the structure
133 * is shared between the parent and child).
135 * Since these structures are populated once, during the discovery
136 * phase of image construction, they are effectively immutable so
137 * we make no effort to synchronize access to them.
139 * Note that code herein does not assume the image name is known (it
140 * could be a null pointer).
144 const char *pool_name;
146 const char *image_id;
147 const char *image_name;
150 const char *snap_name;
156 * an instance of the client. multiple devices may share an rbd client.
159 struct ceph_client *client;
161 struct list_head node;
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172 enum obj_request_type {
173 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
178 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
179 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
180 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
183 struct rbd_obj_request {
184 const char *object_name;
185 u64 offset; /* object start byte */
186 u64 length; /* bytes from offset */
190 * An object request associated with an image will have its
191 * img_data flag set; a standalone object request will not.
193 * A standalone object request will have which == BAD_WHICH
194 * and a null obj_request pointer.
196 * An object request initiated in support of a layered image
197 * object (to check for its existence before a write) will
198 * have which == BAD_WHICH and a non-null obj_request pointer.
200 * Finally, an object request for rbd image data will have
201 * which != BAD_WHICH, and will have a non-null img_request
202 * pointer. The value of which will be in the range
203 * 0..(img_request->obj_request_count-1).
206 struct rbd_obj_request *obj_request; /* STAT op */
208 struct rbd_img_request *img_request;
210 /* links for img_request->obj_requests list */
211 struct list_head links;
214 u32 which; /* posn image request list */
216 enum obj_request_type type;
218 struct bio *bio_list;
224 struct page **copyup_pages;
226 struct ceph_osd_request *osd_req;
228 u64 xferred; /* bytes transferred */
232 rbd_obj_callback_t callback;
233 struct completion completion;
239 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
240 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
241 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
244 struct rbd_img_request {
245 struct rbd_device *rbd_dev;
246 u64 offset; /* starting image byte offset */
247 u64 length; /* byte count from offset */
250 u64 snap_id; /* for reads */
251 struct ceph_snap_context *snapc; /* for writes */
254 struct request *rq; /* block request */
255 struct rbd_obj_request *obj_request; /* obj req initiator */
257 struct page **copyup_pages;
258 spinlock_t completion_lock;/* protects next_completion */
260 rbd_img_callback_t callback;
261 u64 xferred;/* aggregate bytes transferred */
262 int result; /* first nonzero obj_request result */
264 u32 obj_request_count;
265 struct list_head obj_requests; /* rbd_obj_request structs */
270 #define for_each_obj_request(ireq, oreq) \
271 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
280 struct list_head node;
295 int dev_id; /* blkdev unique id */
297 int major; /* blkdev assigned major */
298 struct gendisk *disk; /* blkdev's gendisk and rq */
300 u32 image_format; /* Either 1 or 2 */
301 struct rbd_client *rbd_client;
303 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
305 spinlock_t lock; /* queue, flags, open_count */
307 struct rbd_image_header header;
308 unsigned long flags; /* possibly lock protected */
309 struct rbd_spec *spec;
313 struct ceph_file_layout layout;
315 struct ceph_osd_event *watch_event;
316 struct rbd_obj_request *watch_request;
318 struct rbd_spec *parent_spec;
320 struct rbd_device *parent;
322 /* protects updating the header */
323 struct rw_semaphore header_rwsem;
325 struct rbd_mapping mapping;
327 struct list_head node;
329 /* list of snapshots */
330 struct list_head snaps;
334 unsigned long open_count; /* protected by lock */
338 * Flag bits for rbd_dev->flags. If atomicity is required,
339 * rbd_dev->lock is used to protect access.
341 * Currently, only the "removing" flag (which is coupled with the
342 * "open_count" field) requires atomic access.
345 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
346 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
349 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
351 static LIST_HEAD(rbd_dev_list); /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
354 static LIST_HEAD(rbd_client_list); /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
370 static struct bus_attribute rbd_bus_attrs[] = {
371 __ATTR(add, S_IWUSR, NULL, rbd_add),
372 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
376 static struct bus_type rbd_bus_type = {
378 .bus_attrs = rbd_bus_attrs,
381 static void rbd_root_dev_release(struct device *dev)
385 static struct device rbd_root_dev = {
387 .release = rbd_root_dev_release,
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
393 struct va_format vaf;
401 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402 else if (rbd_dev->disk)
403 printk(KERN_WARNING "%s: %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405 else if (rbd_dev->spec && rbd_dev->spec->image_name)
406 printk(KERN_WARNING "%s: image %s: %pV\n",
407 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408 else if (rbd_dev->spec && rbd_dev->spec->image_id)
409 printk(KERN_WARNING "%s: id %s: %pV\n",
410 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
412 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413 RBD_DRV_NAME, rbd_dev, &vaf);
418 #define rbd_assert(expr) \
419 if (unlikely(!(expr))) { \
420 printk(KERN_ERR "\nAssertion failure in %s() " \
422 "\trbd_assert(%s);\n\n", \
423 __func__, __LINE__, #expr); \
426 #else /* !RBD_DEBUG */
427 # define rbd_assert(expr) ((void) 0)
428 #endif /* !RBD_DEBUG */
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440 bool removing = false;
442 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
445 spin_lock_irq(&rbd_dev->lock);
446 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
449 rbd_dev->open_count++;
450 spin_unlock_irq(&rbd_dev->lock);
454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455 (void) get_device(&rbd_dev->dev);
456 set_device_ro(bdev, rbd_dev->mapping.read_only);
457 mutex_unlock(&ctl_mutex);
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
464 struct rbd_device *rbd_dev = disk->private_data;
465 unsigned long open_count_before;
467 spin_lock_irq(&rbd_dev->lock);
468 open_count_before = rbd_dev->open_count--;
469 spin_unlock_irq(&rbd_dev->lock);
470 rbd_assert(open_count_before > 0);
472 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473 put_device(&rbd_dev->dev);
474 mutex_unlock(&ctl_mutex);
479 static const struct block_device_operations rbd_bd_ops = {
480 .owner = THIS_MODULE,
482 .release = rbd_release,
486 * Initialize an rbd client instance.
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
491 struct rbd_client *rbdc;
494 dout("%s:\n", __func__);
495 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
499 kref_init(&rbdc->kref);
500 INIT_LIST_HEAD(&rbdc->node);
502 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505 if (IS_ERR(rbdc->client))
507 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
509 ret = ceph_open_session(rbdc->client);
513 spin_lock(&rbd_client_list_lock);
514 list_add_tail(&rbdc->node, &rbd_client_list);
515 spin_unlock(&rbd_client_list_lock);
517 mutex_unlock(&ctl_mutex);
518 dout("%s: rbdc %p\n", __func__, rbdc);
523 ceph_destroy_client(rbdc->client);
525 mutex_unlock(&ctl_mutex);
529 ceph_destroy_options(ceph_opts);
530 dout("%s: error %d\n", __func__, ret);
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537 kref_get(&rbdc->kref);
543 * Find a ceph client with specific addr and configuration. If
544 * found, bump its reference count.
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
548 struct rbd_client *client_node;
551 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
554 spin_lock(&rbd_client_list_lock);
555 list_for_each_entry(client_node, &rbd_client_list, node) {
556 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557 __rbd_get_client(client_node);
563 spin_unlock(&rbd_client_list_lock);
565 return found ? client_node : NULL;
575 /* string args above */
578 /* Boolean args above */
582 static match_table_t rbd_opts_tokens = {
584 /* string args above */
585 {Opt_read_only, "read_only"},
586 {Opt_read_only, "ro"}, /* Alternate spelling */
587 {Opt_read_write, "read_write"},
588 {Opt_read_write, "rw"}, /* Alternate spelling */
589 /* Boolean args above */
597 #define RBD_READ_ONLY_DEFAULT false
599 static int parse_rbd_opts_token(char *c, void *private)
601 struct rbd_options *rbd_opts = private;
602 substring_t argstr[MAX_OPT_ARGS];
603 int token, intval, ret;
605 token = match_token(c, rbd_opts_tokens, argstr);
609 if (token < Opt_last_int) {
610 ret = match_int(&argstr[0], &intval);
612 pr_err("bad mount option arg (not int) "
616 dout("got int token %d val %d\n", token, intval);
617 } else if (token > Opt_last_int && token < Opt_last_string) {
618 dout("got string token %d val %s\n", token,
620 } else if (token > Opt_last_string && token < Opt_last_bool) {
621 dout("got Boolean token %d\n", token);
623 dout("got token %d\n", token);
628 rbd_opts->read_only = true;
631 rbd_opts->read_only = false;
641 * Get a ceph client with specific addr and configuration, if one does
642 * not exist create it.
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
646 struct rbd_client *rbdc;
648 rbdc = rbd_client_find(ceph_opts);
649 if (rbdc) /* using an existing client */
650 ceph_destroy_options(ceph_opts);
652 rbdc = rbd_client_create(ceph_opts);
658 * Destroy ceph client
660 * Caller must hold rbd_client_list_lock.
662 static void rbd_client_release(struct kref *kref)
664 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666 dout("%s: rbdc %p\n", __func__, rbdc);
667 spin_lock(&rbd_client_list_lock);
668 list_del(&rbdc->node);
669 spin_unlock(&rbd_client_list_lock);
671 ceph_destroy_client(rbdc->client);
675 /* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
677 static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
679 struct ceph_snap_context *snapc;
682 size = sizeof (struct ceph_snap_context);
683 size += snap_count * sizeof (snapc->snaps[0]);
684 snapc = kzalloc(size, GFP_KERNEL);
688 atomic_set(&snapc->nref, 1);
689 snapc->num_snaps = snap_count;
694 static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
696 (void)ceph_get_snap_context(snapc);
699 static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
701 ceph_put_snap_context(snapc);
705 * Drop reference to ceph client node. If it's not referenced anymore, release
708 static void rbd_put_client(struct rbd_client *rbdc)
711 kref_put(&rbdc->kref, rbd_client_release);
714 static bool rbd_image_format_valid(u32 image_format)
716 return image_format == 1 || image_format == 2;
719 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
724 /* The header has to start with the magic rbd header text */
725 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
728 /* The bio layer requires at least sector-sized I/O */
730 if (ondisk->options.order < SECTOR_SHIFT)
733 /* If we use u64 in a few spots we may be able to loosen this */
735 if (ondisk->options.order > 8 * sizeof (int) - 1)
739 * The size of a snapshot header has to fit in a size_t, and
740 * that limits the number of snapshots.
742 snap_count = le32_to_cpu(ondisk->snap_count);
743 size = SIZE_MAX - sizeof (struct ceph_snap_context);
744 if (snap_count > size / sizeof (__le64))
748 * Not only that, but the size of the entire the snapshot
749 * header must also be representable in a size_t.
751 size -= snap_count * sizeof (__le64);
752 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
759 * Create a new header structure, translate header format from the on-disk
762 static int rbd_header_from_disk(struct rbd_image_header *header,
763 struct rbd_image_header_ondisk *ondisk)
770 memset(header, 0, sizeof (*header));
772 snap_count = le32_to_cpu(ondisk->snap_count);
774 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
775 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
776 if (!header->object_prefix)
778 memcpy(header->object_prefix, ondisk->object_prefix, len);
779 header->object_prefix[len] = '\0';
782 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
784 /* Save a copy of the snapshot names */
786 if (snap_names_len > (u64) SIZE_MAX)
788 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
789 if (!header->snap_names)
792 * Note that rbd_dev_v1_header_read() guarantees
793 * the ondisk buffer we're working with has
794 * snap_names_len bytes beyond the end of the
795 * snapshot id array, this memcpy() is safe.
797 memcpy(header->snap_names, &ondisk->snaps[snap_count],
800 /* Record each snapshot's size */
802 size = snap_count * sizeof (*header->snap_sizes);
803 header->snap_sizes = kmalloc(size, GFP_KERNEL);
804 if (!header->snap_sizes)
806 for (i = 0; i < snap_count; i++)
807 header->snap_sizes[i] =
808 le64_to_cpu(ondisk->snaps[i].image_size);
810 header->snap_names = NULL;
811 header->snap_sizes = NULL;
814 header->features = 0; /* No features support in v1 images */
815 header->obj_order = ondisk->options.order;
816 header->crypt_type = ondisk->options.crypt_type;
817 header->comp_type = ondisk->options.comp_type;
819 /* Allocate and fill in the snapshot context */
821 header->image_size = le64_to_cpu(ondisk->image_size);
823 header->snapc = rbd_snap_context_create(snap_count);
826 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
827 for (i = 0; i < snap_count; i++)
828 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
833 kfree(header->snap_sizes);
834 header->snap_sizes = NULL;
835 kfree(header->snap_names);
836 header->snap_names = NULL;
837 kfree(header->object_prefix);
838 header->object_prefix = NULL;
843 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
845 struct rbd_snap *snap;
847 if (snap_id == CEPH_NOSNAP)
848 return RBD_SNAP_HEAD_NAME;
850 list_for_each_entry(snap, &rbd_dev->snaps, node)
851 if (snap_id == snap->id)
857 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
858 const char *snap_name)
860 struct rbd_snap *snap;
862 list_for_each_entry(snap, &rbd_dev->snaps, node)
863 if (!strcmp(snap_name, snap->name))
869 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
871 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
872 sizeof (RBD_SNAP_HEAD_NAME))) {
873 rbd_dev->mapping.size = rbd_dev->header.image_size;
874 rbd_dev->mapping.features = rbd_dev->header.features;
876 struct rbd_snap *snap;
878 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
881 rbd_dev->mapping.size = snap->size;
882 rbd_dev->mapping.features = snap->features;
883 rbd_dev->mapping.read_only = true;
889 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
891 rbd_dev->mapping.size = 0;
892 rbd_dev->mapping.features = 0;
893 rbd_dev->mapping.read_only = true;
896 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
898 rbd_dev->mapping.size = 0;
899 rbd_dev->mapping.features = 0;
900 rbd_dev->mapping.read_only = true;
903 static void rbd_header_free(struct rbd_image_header *header)
905 kfree(header->object_prefix);
906 header->object_prefix = NULL;
907 kfree(header->snap_sizes);
908 header->snap_sizes = NULL;
909 kfree(header->snap_names);
910 header->snap_names = NULL;
911 rbd_snap_context_put(header->snapc);
912 header->snapc = NULL;
915 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
921 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
924 segment = offset >> rbd_dev->header.obj_order;
925 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
926 rbd_dev->header.object_prefix, segment);
927 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
928 pr_err("error formatting segment name for #%llu (%d)\n",
937 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
939 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
941 return offset & (segment_size - 1);
944 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
945 u64 offset, u64 length)
947 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
949 offset &= segment_size - 1;
951 rbd_assert(length <= U64_MAX - offset);
952 if (offset + length > segment_size)
953 length = segment_size - offset;
959 * returns the size of an object in the image
961 static u64 rbd_obj_bytes(struct rbd_image_header *header)
963 return 1 << header->obj_order;
970 static void bio_chain_put(struct bio *chain)
976 chain = chain->bi_next;
982 * zeros a bio chain, starting at specific offset
984 static void zero_bio_chain(struct bio *chain, int start_ofs)
993 bio_for_each_segment(bv, chain, i) {
994 if (pos + bv->bv_len > start_ofs) {
995 int remainder = max(start_ofs - pos, 0);
996 buf = bvec_kmap_irq(bv, &flags);
997 memset(buf + remainder, 0,
998 bv->bv_len - remainder);
999 bvec_kunmap_irq(buf, &flags);
1004 chain = chain->bi_next;
1009 * similar to zero_bio_chain(), zeros data defined by a page array,
1010 * starting at the given byte offset from the start of the array and
1011 * continuing up to the given end offset. The pages array is
1012 * assumed to be big enough to hold all bytes up to the end.
1014 static void zero_pages(struct page **pages, u64 offset, u64 end)
1016 struct page **page = &pages[offset >> PAGE_SHIFT];
1018 rbd_assert(end > offset);
1019 rbd_assert(end - offset <= (u64)SIZE_MAX);
1020 while (offset < end) {
1023 unsigned long flags;
1026 page_offset = (size_t)(offset & ~PAGE_MASK);
1027 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1028 local_irq_save(flags);
1029 kaddr = kmap_atomic(*page);
1030 memset(kaddr + page_offset, 0, length);
1031 kunmap_atomic(kaddr);
1032 local_irq_restore(flags);
1040 * Clone a portion of a bio, starting at the given byte offset
1041 * and continuing for the number of bytes indicated.
1043 static struct bio *bio_clone_range(struct bio *bio_src,
1044 unsigned int offset,
1052 unsigned short end_idx;
1053 unsigned short vcnt;
1056 /* Handle the easy case for the caller */
1058 if (!offset && len == bio_src->bi_size)
1059 return bio_clone(bio_src, gfpmask);
1061 if (WARN_ON_ONCE(!len))
1063 if (WARN_ON_ONCE(len > bio_src->bi_size))
1065 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1068 /* Find first affected segment... */
1071 __bio_for_each_segment(bv, bio_src, idx, 0) {
1072 if (resid < bv->bv_len)
1074 resid -= bv->bv_len;
1078 /* ...and the last affected segment */
1081 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1082 if (resid <= bv->bv_len)
1084 resid -= bv->bv_len;
1086 vcnt = end_idx - idx + 1;
1088 /* Build the clone */
1090 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1092 return NULL; /* ENOMEM */
1094 bio->bi_bdev = bio_src->bi_bdev;
1095 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1096 bio->bi_rw = bio_src->bi_rw;
1097 bio->bi_flags |= 1 << BIO_CLONED;
1100 * Copy over our part of the bio_vec, then update the first
1101 * and last (or only) entries.
1103 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1104 vcnt * sizeof (struct bio_vec));
1105 bio->bi_io_vec[0].bv_offset += voff;
1107 bio->bi_io_vec[0].bv_len -= voff;
1108 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1110 bio->bi_io_vec[0].bv_len = len;
1113 bio->bi_vcnt = vcnt;
1121 * Clone a portion of a bio chain, starting at the given byte offset
1122 * into the first bio in the source chain and continuing for the
1123 * number of bytes indicated. The result is another bio chain of
1124 * exactly the given length, or a null pointer on error.
1126 * The bio_src and offset parameters are both in-out. On entry they
1127 * refer to the first source bio and the offset into that bio where
1128 * the start of data to be cloned is located.
1130 * On return, bio_src is updated to refer to the bio in the source
1131 * chain that contains first un-cloned byte, and *offset will
1132 * contain the offset of that byte within that bio.
1134 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1135 unsigned int *offset,
1139 struct bio *bi = *bio_src;
1140 unsigned int off = *offset;
1141 struct bio *chain = NULL;
1144 /* Build up a chain of clone bios up to the limit */
1146 if (!bi || off >= bi->bi_size || !len)
1147 return NULL; /* Nothing to clone */
1151 unsigned int bi_size;
1155 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1156 goto out_err; /* EINVAL; ran out of bio's */
1158 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1159 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1161 goto out_err; /* ENOMEM */
1164 end = &bio->bi_next;
1167 if (off == bi->bi_size) {
1178 bio_chain_put(chain);
1184 * The default/initial value for all object request flags is 0. For
1185 * each flag, once its value is set to 1 it is never reset to 0
1188 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1190 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1191 struct rbd_device *rbd_dev;
1193 rbd_dev = obj_request->img_request->rbd_dev;
1194 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1199 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1202 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1205 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1207 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1208 struct rbd_device *rbd_dev = NULL;
1210 if (obj_request_img_data_test(obj_request))
1211 rbd_dev = obj_request->img_request->rbd_dev;
1212 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1217 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1220 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1224 * This sets the KNOWN flag after (possibly) setting the EXISTS
1225 * flag. The latter is set based on the "exists" value provided.
1227 * Note that for our purposes once an object exists it never goes
1228 * away again. It's possible that the response from two existence
1229 * checks are separated by the creation of the target object, and
1230 * the first ("doesn't exist") response arrives *after* the second
1231 * ("does exist"). In that case we ignore the second one.
1233 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1237 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1238 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1242 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1245 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1248 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1251 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1254 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1256 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1257 atomic_read(&obj_request->kref.refcount));
1258 kref_get(&obj_request->kref);
1261 static void rbd_obj_request_destroy(struct kref *kref);
1262 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1264 rbd_assert(obj_request != NULL);
1265 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1266 atomic_read(&obj_request->kref.refcount));
1267 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1270 static void rbd_img_request_get(struct rbd_img_request *img_request)
1272 dout("%s: img %p (was %d)\n", __func__, img_request,
1273 atomic_read(&img_request->kref.refcount));
1274 kref_get(&img_request->kref);
1277 static void rbd_img_request_destroy(struct kref *kref);
1278 static void rbd_img_request_put(struct rbd_img_request *img_request)
1280 rbd_assert(img_request != NULL);
1281 dout("%s: img %p (was %d)\n", __func__, img_request,
1282 atomic_read(&img_request->kref.refcount));
1283 kref_put(&img_request->kref, rbd_img_request_destroy);
1286 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1287 struct rbd_obj_request *obj_request)
1289 rbd_assert(obj_request->img_request == NULL);
1291 /* Image request now owns object's original reference */
1292 obj_request->img_request = img_request;
1293 obj_request->which = img_request->obj_request_count;
1294 rbd_assert(!obj_request_img_data_test(obj_request));
1295 obj_request_img_data_set(obj_request);
1296 rbd_assert(obj_request->which != BAD_WHICH);
1297 img_request->obj_request_count++;
1298 list_add_tail(&obj_request->links, &img_request->obj_requests);
1299 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1300 obj_request->which);
1303 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1304 struct rbd_obj_request *obj_request)
1306 rbd_assert(obj_request->which != BAD_WHICH);
1308 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1309 obj_request->which);
1310 list_del(&obj_request->links);
1311 rbd_assert(img_request->obj_request_count > 0);
1312 img_request->obj_request_count--;
1313 rbd_assert(obj_request->which == img_request->obj_request_count);
1314 obj_request->which = BAD_WHICH;
1315 rbd_assert(obj_request_img_data_test(obj_request));
1316 rbd_assert(obj_request->img_request == img_request);
1317 obj_request->img_request = NULL;
1318 obj_request->callback = NULL;
1319 rbd_obj_request_put(obj_request);
1322 static bool obj_request_type_valid(enum obj_request_type type)
1325 case OBJ_REQUEST_NODATA:
1326 case OBJ_REQUEST_BIO:
1327 case OBJ_REQUEST_PAGES:
1334 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1335 struct rbd_obj_request *obj_request)
1337 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1339 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1342 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1345 dout("%s: img %p\n", __func__, img_request);
1348 * If no error occurred, compute the aggregate transfer
1349 * count for the image request. We could instead use
1350 * atomic64_cmpxchg() to update it as each object request
1351 * completes; not clear which way is better off hand.
1353 if (!img_request->result) {
1354 struct rbd_obj_request *obj_request;
1357 for_each_obj_request(img_request, obj_request)
1358 xferred += obj_request->xferred;
1359 img_request->xferred = xferred;
1362 if (img_request->callback)
1363 img_request->callback(img_request);
1365 rbd_img_request_put(img_request);
1368 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1370 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1372 dout("%s: obj %p\n", __func__, obj_request);
1374 return wait_for_completion_interruptible(&obj_request->completion);
1378 * The default/initial value for all image request flags is 0. Each
1379 * is conditionally set to 1 at image request initialization time
1380 * and currently never change thereafter.
1382 static void img_request_write_set(struct rbd_img_request *img_request)
1384 set_bit(IMG_REQ_WRITE, &img_request->flags);
1388 static bool img_request_write_test(struct rbd_img_request *img_request)
1391 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1394 static void img_request_child_set(struct rbd_img_request *img_request)
1396 set_bit(IMG_REQ_CHILD, &img_request->flags);
1400 static bool img_request_child_test(struct rbd_img_request *img_request)
1403 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1406 static void img_request_layered_set(struct rbd_img_request *img_request)
1408 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1412 static bool img_request_layered_test(struct rbd_img_request *img_request)
1415 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1419 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1421 u64 xferred = obj_request->xferred;
1422 u64 length = obj_request->length;
1424 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1425 obj_request, obj_request->img_request, obj_request->result,
1428 * ENOENT means a hole in the image. We zero-fill the
1429 * entire length of the request. A short read also implies
1430 * zero-fill to the end of the request. Either way we
1431 * update the xferred count to indicate the whole request
1434 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1435 if (obj_request->result == -ENOENT) {
1436 if (obj_request->type == OBJ_REQUEST_BIO)
1437 zero_bio_chain(obj_request->bio_list, 0);
1439 zero_pages(obj_request->pages, 0, length);
1440 obj_request->result = 0;
1441 obj_request->xferred = length;
1442 } else if (xferred < length && !obj_request->result) {
1443 if (obj_request->type == OBJ_REQUEST_BIO)
1444 zero_bio_chain(obj_request->bio_list, xferred);
1446 zero_pages(obj_request->pages, xferred, length);
1447 obj_request->xferred = length;
1449 obj_request_done_set(obj_request);
1452 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1454 dout("%s: obj %p cb %p\n", __func__, obj_request,
1455 obj_request->callback);
1456 if (obj_request->callback)
1457 obj_request->callback(obj_request);
1459 complete_all(&obj_request->completion);
1462 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1464 dout("%s: obj %p\n", __func__, obj_request);
1465 obj_request_done_set(obj_request);
1468 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1470 struct rbd_img_request *img_request = NULL;
1471 struct rbd_device *rbd_dev = NULL;
1472 bool layered = false;
1474 if (obj_request_img_data_test(obj_request)) {
1475 img_request = obj_request->img_request;
1476 layered = img_request && img_request_layered_test(img_request);
1477 rbd_dev = img_request->rbd_dev;
1480 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1481 obj_request, img_request, obj_request->result,
1482 obj_request->xferred, obj_request->length);
1483 if (layered && obj_request->result == -ENOENT &&
1484 obj_request->img_offset < rbd_dev->parent_overlap)
1485 rbd_img_parent_read(obj_request);
1486 else if (img_request)
1487 rbd_img_obj_request_read_callback(obj_request);
1489 obj_request_done_set(obj_request);
1492 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1494 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1495 obj_request->result, obj_request->length);
1497 * There is no such thing as a successful short write. Set
1498 * it to our originally-requested length.
1500 obj_request->xferred = obj_request->length;
1501 obj_request_done_set(obj_request);
1505 * For a simple stat call there's nothing to do. We'll do more if
1506 * this is part of a write sequence for a layered image.
1508 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1510 dout("%s: obj %p\n", __func__, obj_request);
1511 obj_request_done_set(obj_request);
1514 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1515 struct ceph_msg *msg)
1517 struct rbd_obj_request *obj_request = osd_req->r_priv;
1520 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1521 rbd_assert(osd_req == obj_request->osd_req);
1522 if (obj_request_img_data_test(obj_request)) {
1523 rbd_assert(obj_request->img_request);
1524 rbd_assert(obj_request->which != BAD_WHICH);
1526 rbd_assert(obj_request->which == BAD_WHICH);
1529 if (osd_req->r_result < 0)
1530 obj_request->result = osd_req->r_result;
1531 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1533 BUG_ON(osd_req->r_num_ops > 2);
1536 * We support a 64-bit length, but ultimately it has to be
1537 * passed to blk_end_request(), which takes an unsigned int.
1539 obj_request->xferred = osd_req->r_reply_op_len[0];
1540 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1541 opcode = osd_req->r_ops[0].op;
1543 case CEPH_OSD_OP_READ:
1544 rbd_osd_read_callback(obj_request);
1546 case CEPH_OSD_OP_WRITE:
1547 rbd_osd_write_callback(obj_request);
1549 case CEPH_OSD_OP_STAT:
1550 rbd_osd_stat_callback(obj_request);
1552 case CEPH_OSD_OP_CALL:
1553 case CEPH_OSD_OP_NOTIFY_ACK:
1554 case CEPH_OSD_OP_WATCH:
1555 rbd_osd_trivial_callback(obj_request);
1558 rbd_warn(NULL, "%s: unsupported op %hu\n",
1559 obj_request->object_name, (unsigned short) opcode);
1563 if (obj_request_done_test(obj_request))
1564 rbd_obj_request_complete(obj_request);
1567 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1569 struct rbd_img_request *img_request = obj_request->img_request;
1570 struct ceph_osd_request *osd_req = obj_request->osd_req;
1573 rbd_assert(osd_req != NULL);
1575 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1576 ceph_osdc_build_request(osd_req, obj_request->offset,
1577 NULL, snap_id, NULL);
1580 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1582 struct rbd_img_request *img_request = obj_request->img_request;
1583 struct ceph_osd_request *osd_req = obj_request->osd_req;
1584 struct ceph_snap_context *snapc;
1585 struct timespec mtime = CURRENT_TIME;
1587 rbd_assert(osd_req != NULL);
1589 snapc = img_request ? img_request->snapc : NULL;
1590 ceph_osdc_build_request(osd_req, obj_request->offset,
1591 snapc, CEPH_NOSNAP, &mtime);
1594 static struct ceph_osd_request *rbd_osd_req_create(
1595 struct rbd_device *rbd_dev,
1597 struct rbd_obj_request *obj_request)
1599 struct ceph_snap_context *snapc = NULL;
1600 struct ceph_osd_client *osdc;
1601 struct ceph_osd_request *osd_req;
1603 if (obj_request_img_data_test(obj_request)) {
1604 struct rbd_img_request *img_request = obj_request->img_request;
1606 rbd_assert(write_request ==
1607 img_request_write_test(img_request));
1609 snapc = img_request->snapc;
1612 /* Allocate and initialize the request, for the single op */
1614 osdc = &rbd_dev->rbd_client->client->osdc;
1615 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1617 return NULL; /* ENOMEM */
1620 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1622 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1624 osd_req->r_callback = rbd_osd_req_callback;
1625 osd_req->r_priv = obj_request;
1627 osd_req->r_oid_len = strlen(obj_request->object_name);
1628 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1629 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1631 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1637 * Create a copyup osd request based on the information in the
1638 * object request supplied. A copyup request has two osd ops,
1639 * a copyup method call, and a "normal" write request.
1641 static struct ceph_osd_request *
1642 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1644 struct rbd_img_request *img_request;
1645 struct ceph_snap_context *snapc;
1646 struct rbd_device *rbd_dev;
1647 struct ceph_osd_client *osdc;
1648 struct ceph_osd_request *osd_req;
1650 rbd_assert(obj_request_img_data_test(obj_request));
1651 img_request = obj_request->img_request;
1652 rbd_assert(img_request);
1653 rbd_assert(img_request_write_test(img_request));
1655 /* Allocate and initialize the request, for the two ops */
1657 snapc = img_request->snapc;
1658 rbd_dev = img_request->rbd_dev;
1659 osdc = &rbd_dev->rbd_client->client->osdc;
1660 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1662 return NULL; /* ENOMEM */
1664 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1665 osd_req->r_callback = rbd_osd_req_callback;
1666 osd_req->r_priv = obj_request;
1668 osd_req->r_oid_len = strlen(obj_request->object_name);
1669 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1670 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1672 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1678 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1680 ceph_osdc_put_request(osd_req);
1683 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1685 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1686 u64 offset, u64 length,
1687 enum obj_request_type type)
1689 struct rbd_obj_request *obj_request;
1693 rbd_assert(obj_request_type_valid(type));
1695 size = strlen(object_name) + 1;
1696 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1700 name = (char *)(obj_request + 1);
1701 obj_request->object_name = memcpy(name, object_name, size);
1702 obj_request->offset = offset;
1703 obj_request->length = length;
1704 obj_request->flags = 0;
1705 obj_request->which = BAD_WHICH;
1706 obj_request->type = type;
1707 INIT_LIST_HEAD(&obj_request->links);
1708 init_completion(&obj_request->completion);
1709 kref_init(&obj_request->kref);
1711 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1712 offset, length, (int)type, obj_request);
1717 static void rbd_obj_request_destroy(struct kref *kref)
1719 struct rbd_obj_request *obj_request;
1721 obj_request = container_of(kref, struct rbd_obj_request, kref);
1723 dout("%s: obj %p\n", __func__, obj_request);
1725 rbd_assert(obj_request->img_request == NULL);
1726 rbd_assert(obj_request->which == BAD_WHICH);
1728 if (obj_request->osd_req)
1729 rbd_osd_req_destroy(obj_request->osd_req);
1731 rbd_assert(obj_request_type_valid(obj_request->type));
1732 switch (obj_request->type) {
1733 case OBJ_REQUEST_NODATA:
1734 break; /* Nothing to do */
1735 case OBJ_REQUEST_BIO:
1736 if (obj_request->bio_list)
1737 bio_chain_put(obj_request->bio_list);
1739 case OBJ_REQUEST_PAGES:
1740 if (obj_request->pages)
1741 ceph_release_page_vector(obj_request->pages,
1742 obj_request->page_count);
1750 * Caller is responsible for filling in the list of object requests
1751 * that comprises the image request, and the Linux request pointer
1752 * (if there is one).
1754 static struct rbd_img_request *rbd_img_request_create(
1755 struct rbd_device *rbd_dev,
1756 u64 offset, u64 length,
1760 struct rbd_img_request *img_request;
1762 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1766 if (write_request) {
1767 down_read(&rbd_dev->header_rwsem);
1768 rbd_snap_context_get(rbd_dev->header.snapc);
1769 up_read(&rbd_dev->header_rwsem);
1772 img_request->rq = NULL;
1773 img_request->rbd_dev = rbd_dev;
1774 img_request->offset = offset;
1775 img_request->length = length;
1776 img_request->flags = 0;
1777 if (write_request) {
1778 img_request_write_set(img_request);
1779 img_request->snapc = rbd_dev->header.snapc;
1781 img_request->snap_id = rbd_dev->spec->snap_id;
1784 img_request_child_set(img_request);
1785 if (rbd_dev->parent_spec)
1786 img_request_layered_set(img_request);
1787 spin_lock_init(&img_request->completion_lock);
1788 img_request->next_completion = 0;
1789 img_request->callback = NULL;
1790 img_request->result = 0;
1791 img_request->obj_request_count = 0;
1792 INIT_LIST_HEAD(&img_request->obj_requests);
1793 kref_init(&img_request->kref);
1795 rbd_img_request_get(img_request); /* Avoid a warning */
1796 rbd_img_request_put(img_request); /* TEMPORARY */
1798 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1799 write_request ? "write" : "read", offset, length,
1805 static void rbd_img_request_destroy(struct kref *kref)
1807 struct rbd_img_request *img_request;
1808 struct rbd_obj_request *obj_request;
1809 struct rbd_obj_request *next_obj_request;
1811 img_request = container_of(kref, struct rbd_img_request, kref);
1813 dout("%s: img %p\n", __func__, img_request);
1815 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1816 rbd_img_obj_request_del(img_request, obj_request);
1817 rbd_assert(img_request->obj_request_count == 0);
1819 if (img_request_write_test(img_request))
1820 rbd_snap_context_put(img_request->snapc);
1822 if (img_request_child_test(img_request))
1823 rbd_obj_request_put(img_request->obj_request);
1828 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1830 struct rbd_img_request *img_request;
1831 unsigned int xferred;
1835 rbd_assert(obj_request_img_data_test(obj_request));
1836 img_request = obj_request->img_request;
1838 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1839 xferred = (unsigned int)obj_request->xferred;
1840 result = obj_request->result;
1842 struct rbd_device *rbd_dev = img_request->rbd_dev;
1844 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1845 img_request_write_test(img_request) ? "write" : "read",
1846 obj_request->length, obj_request->img_offset,
1847 obj_request->offset);
1848 rbd_warn(rbd_dev, " result %d xferred %x\n",
1850 if (!img_request->result)
1851 img_request->result = result;
1854 /* Image object requests don't own their page array */
1856 if (obj_request->type == OBJ_REQUEST_PAGES) {
1857 obj_request->pages = NULL;
1858 obj_request->page_count = 0;
1861 if (img_request_child_test(img_request)) {
1862 rbd_assert(img_request->obj_request != NULL);
1863 more = obj_request->which < img_request->obj_request_count - 1;
1865 rbd_assert(img_request->rq != NULL);
1866 more = blk_end_request(img_request->rq, result, xferred);
1872 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1874 struct rbd_img_request *img_request;
1875 u32 which = obj_request->which;
1878 rbd_assert(obj_request_img_data_test(obj_request));
1879 img_request = obj_request->img_request;
1881 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1882 rbd_assert(img_request != NULL);
1883 rbd_assert(img_request->obj_request_count > 0);
1884 rbd_assert(which != BAD_WHICH);
1885 rbd_assert(which < img_request->obj_request_count);
1886 rbd_assert(which >= img_request->next_completion);
1888 spin_lock_irq(&img_request->completion_lock);
1889 if (which != img_request->next_completion)
1892 for_each_obj_request_from(img_request, obj_request) {
1894 rbd_assert(which < img_request->obj_request_count);
1896 if (!obj_request_done_test(obj_request))
1898 more = rbd_img_obj_end_request(obj_request);
1902 rbd_assert(more ^ (which == img_request->obj_request_count));
1903 img_request->next_completion = which;
1905 spin_unlock_irq(&img_request->completion_lock);
1908 rbd_img_request_complete(img_request);
1912 * Split up an image request into one or more object requests, each
1913 * to a different object. The "type" parameter indicates whether
1914 * "data_desc" is the pointer to the head of a list of bio
1915 * structures, or the base of a page array. In either case this
1916 * function assumes data_desc describes memory sufficient to hold
1917 * all data described by the image request.
1919 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1920 enum obj_request_type type,
1923 struct rbd_device *rbd_dev = img_request->rbd_dev;
1924 struct rbd_obj_request *obj_request = NULL;
1925 struct rbd_obj_request *next_obj_request;
1926 bool write_request = img_request_write_test(img_request);
1927 struct bio *bio_list;
1928 unsigned int bio_offset = 0;
1929 struct page **pages;
1934 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1935 (int)type, data_desc);
1937 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1938 img_offset = img_request->offset;
1939 resid = img_request->length;
1940 rbd_assert(resid > 0);
1942 if (type == OBJ_REQUEST_BIO) {
1943 bio_list = data_desc;
1944 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1946 rbd_assert(type == OBJ_REQUEST_PAGES);
1951 struct ceph_osd_request *osd_req;
1952 const char *object_name;
1956 object_name = rbd_segment_name(rbd_dev, img_offset);
1959 offset = rbd_segment_offset(rbd_dev, img_offset);
1960 length = rbd_segment_length(rbd_dev, img_offset, resid);
1961 obj_request = rbd_obj_request_create(object_name,
1962 offset, length, type);
1963 kfree(object_name); /* object request has its own copy */
1967 if (type == OBJ_REQUEST_BIO) {
1968 unsigned int clone_size;
1970 rbd_assert(length <= (u64)UINT_MAX);
1971 clone_size = (unsigned int)length;
1972 obj_request->bio_list =
1973 bio_chain_clone_range(&bio_list,
1977 if (!obj_request->bio_list)
1980 unsigned int page_count;
1982 obj_request->pages = pages;
1983 page_count = (u32)calc_pages_for(offset, length);
1984 obj_request->page_count = page_count;
1985 if ((offset + length) & ~PAGE_MASK)
1986 page_count--; /* more on last page */
1987 pages += page_count;
1990 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1994 obj_request->osd_req = osd_req;
1995 obj_request->callback = rbd_img_obj_callback;
1997 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1999 if (type == OBJ_REQUEST_BIO)
2000 osd_req_op_extent_osd_data_bio(osd_req, 0,
2001 obj_request->bio_list, length);
2003 osd_req_op_extent_osd_data_pages(osd_req, 0,
2004 obj_request->pages, length,
2005 offset & ~PAGE_MASK, false, false);
2008 rbd_osd_req_format_write(obj_request);
2010 rbd_osd_req_format_read(obj_request);
2012 obj_request->img_offset = img_offset;
2013 rbd_img_obj_request_add(img_request, obj_request);
2015 img_offset += length;
2022 rbd_obj_request_put(obj_request);
2024 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2025 rbd_obj_request_put(obj_request);
2031 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2033 struct rbd_img_request *img_request;
2034 struct rbd_device *rbd_dev;
2038 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2039 rbd_assert(obj_request_img_data_test(obj_request));
2040 img_request = obj_request->img_request;
2041 rbd_assert(img_request);
2043 rbd_dev = img_request->rbd_dev;
2044 rbd_assert(rbd_dev);
2045 length = (u64)1 << rbd_dev->header.obj_order;
2046 page_count = (u32)calc_pages_for(0, length);
2048 rbd_assert(obj_request->copyup_pages);
2049 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2050 obj_request->copyup_pages = NULL;
2053 * We want the transfer count to reflect the size of the
2054 * original write request. There is no such thing as a
2055 * successful short write, so if the request was successful
2056 * we can just set it to the originally-requested length.
2058 if (!obj_request->result)
2059 obj_request->xferred = obj_request->length;
2061 /* Finish up with the normal image object callback */
2063 rbd_img_obj_callback(obj_request);
2067 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2069 struct rbd_obj_request *orig_request;
2070 struct ceph_osd_request *osd_req;
2071 struct ceph_osd_client *osdc;
2072 struct rbd_device *rbd_dev;
2073 struct page **pages;
2078 rbd_assert(img_request_child_test(img_request));
2080 /* First get what we need from the image request */
2082 pages = img_request->copyup_pages;
2083 rbd_assert(pages != NULL);
2084 img_request->copyup_pages = NULL;
2086 orig_request = img_request->obj_request;
2087 rbd_assert(orig_request != NULL);
2088 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2089 result = img_request->result;
2090 obj_size = img_request->length;
2091 xferred = img_request->xferred;
2093 rbd_dev = img_request->rbd_dev;
2094 rbd_assert(rbd_dev);
2095 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2097 rbd_img_request_put(img_request);
2102 /* Allocate the new copyup osd request for the original request */
2105 rbd_assert(!orig_request->osd_req);
2106 osd_req = rbd_osd_req_create_copyup(orig_request);
2109 orig_request->osd_req = osd_req;
2110 orig_request->copyup_pages = pages;
2112 /* Initialize the copyup op */
2114 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2115 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2118 /* Then the original write request op */
2120 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2121 orig_request->offset,
2122 orig_request->length, 0, 0);
2123 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2124 orig_request->length);
2126 rbd_osd_req_format_write(orig_request);
2128 /* All set, send it off. */
2130 orig_request->callback = rbd_img_obj_copyup_callback;
2131 osdc = &rbd_dev->rbd_client->client->osdc;
2132 result = rbd_obj_request_submit(osdc, orig_request);
2136 /* Record the error code and complete the request */
2138 orig_request->result = result;
2139 orig_request->xferred = 0;
2140 obj_request_done_set(orig_request);
2141 rbd_obj_request_complete(orig_request);
2145 * Read from the parent image the range of data that covers the
2146 * entire target of the given object request. This is used for
2147 * satisfying a layered image write request when the target of an
2148 * object request from the image request does not exist.
2150 * A page array big enough to hold the returned data is allocated
2151 * and supplied to rbd_img_request_fill() as the "data descriptor."
2152 * When the read completes, this page array will be transferred to
2153 * the original object request for the copyup operation.
2155 * If an error occurs, record it as the result of the original
2156 * object request and mark it done so it gets completed.
2158 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2160 struct rbd_img_request *img_request = NULL;
2161 struct rbd_img_request *parent_request = NULL;
2162 struct rbd_device *rbd_dev;
2165 struct page **pages = NULL;
2169 rbd_assert(obj_request_img_data_test(obj_request));
2170 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2172 img_request = obj_request->img_request;
2173 rbd_assert(img_request != NULL);
2174 rbd_dev = img_request->rbd_dev;
2175 rbd_assert(rbd_dev->parent != NULL);
2178 * First things first. The original osd request is of no
2179 * use to use any more, we'll need a new one that can hold
2180 * the two ops in a copyup request. We'll get that later,
2181 * but for now we can release the old one.
2183 rbd_osd_req_destroy(obj_request->osd_req);
2184 obj_request->osd_req = NULL;
2187 * Determine the byte range covered by the object in the
2188 * child image to which the original request was to be sent.
2190 img_offset = obj_request->img_offset - obj_request->offset;
2191 length = (u64)1 << rbd_dev->header.obj_order;
2194 * There is no defined parent data beyond the parent
2195 * overlap, so limit what we read at that boundary if
2198 if (img_offset + length > rbd_dev->parent_overlap) {
2199 rbd_assert(img_offset < rbd_dev->parent_overlap);
2200 length = rbd_dev->parent_overlap - img_offset;
2204 * Allocate a page array big enough to receive the data read
2207 page_count = (u32)calc_pages_for(0, length);
2208 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2209 if (IS_ERR(pages)) {
2210 result = PTR_ERR(pages);
2216 parent_request = rbd_img_request_create(rbd_dev->parent,
2219 if (!parent_request)
2221 rbd_obj_request_get(obj_request);
2222 parent_request->obj_request = obj_request;
2224 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2227 parent_request->copyup_pages = pages;
2229 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2230 result = rbd_img_request_submit(parent_request);
2234 parent_request->copyup_pages = NULL;
2235 parent_request->obj_request = NULL;
2236 rbd_obj_request_put(obj_request);
2239 ceph_release_page_vector(pages, page_count);
2241 rbd_img_request_put(parent_request);
2242 obj_request->result = result;
2243 obj_request->xferred = 0;
2244 obj_request_done_set(obj_request);
2249 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2251 struct rbd_obj_request *orig_request;
2254 rbd_assert(!obj_request_img_data_test(obj_request));
2257 * All we need from the object request is the original
2258 * request and the result of the STAT op. Grab those, then
2259 * we're done with the request.
2261 orig_request = obj_request->obj_request;
2262 obj_request->obj_request = NULL;
2263 rbd_assert(orig_request);
2264 rbd_assert(orig_request->img_request);
2266 result = obj_request->result;
2267 obj_request->result = 0;
2269 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2270 obj_request, orig_request, result,
2271 obj_request->xferred, obj_request->length);
2272 rbd_obj_request_put(obj_request);
2274 rbd_assert(orig_request);
2275 rbd_assert(orig_request->img_request);
2278 * Our only purpose here is to determine whether the object
2279 * exists, and we don't want to treat the non-existence as
2280 * an error. If something else comes back, transfer the
2281 * error to the original request and complete it now.
2284 obj_request_existence_set(orig_request, true);
2285 } else if (result == -ENOENT) {
2286 obj_request_existence_set(orig_request, false);
2287 } else if (result) {
2288 orig_request->result = result;
2293 * Resubmit the original request now that we have recorded
2294 * whether the target object exists.
2296 orig_request->result = rbd_img_obj_request_submit(orig_request);
2298 if (orig_request->result)
2299 rbd_obj_request_complete(orig_request);
2300 rbd_obj_request_put(orig_request);
2303 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2305 struct rbd_obj_request *stat_request;
2306 struct rbd_device *rbd_dev;
2307 struct ceph_osd_client *osdc;
2308 struct page **pages = NULL;
2314 * The response data for a STAT call consists of:
2321 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2322 page_count = (u32)calc_pages_for(0, size);
2323 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2325 return PTR_ERR(pages);
2328 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2333 rbd_obj_request_get(obj_request);
2334 stat_request->obj_request = obj_request;
2335 stat_request->pages = pages;
2336 stat_request->page_count = page_count;
2338 rbd_assert(obj_request->img_request);
2339 rbd_dev = obj_request->img_request->rbd_dev;
2340 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2342 if (!stat_request->osd_req)
2344 stat_request->callback = rbd_img_obj_exists_callback;
2346 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2347 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2349 rbd_osd_req_format_read(stat_request);
2351 osdc = &rbd_dev->rbd_client->client->osdc;
2352 ret = rbd_obj_request_submit(osdc, stat_request);
2355 rbd_obj_request_put(obj_request);
2360 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2362 struct rbd_img_request *img_request;
2363 struct rbd_device *rbd_dev;
2366 rbd_assert(obj_request_img_data_test(obj_request));
2368 img_request = obj_request->img_request;
2369 rbd_assert(img_request);
2370 rbd_dev = img_request->rbd_dev;
2373 * Only writes to layered images need special handling.
2374 * Reads and non-layered writes are simple object requests.
2375 * Layered writes that start beyond the end of the overlap
2376 * with the parent have no parent data, so they too are
2377 * simple object requests. Finally, if the target object is
2378 * known to already exist, its parent data has already been
2379 * copied, so a write to the object can also be handled as a
2380 * simple object request.
2382 if (!img_request_write_test(img_request) ||
2383 !img_request_layered_test(img_request) ||
2384 rbd_dev->parent_overlap <= obj_request->img_offset ||
2385 ((known = obj_request_known_test(obj_request)) &&
2386 obj_request_exists_test(obj_request))) {
2388 struct rbd_device *rbd_dev;
2389 struct ceph_osd_client *osdc;
2391 rbd_dev = obj_request->img_request->rbd_dev;
2392 osdc = &rbd_dev->rbd_client->client->osdc;
2394 return rbd_obj_request_submit(osdc, obj_request);
2398 * It's a layered write. The target object might exist but
2399 * we may not know that yet. If we know it doesn't exist,
2400 * start by reading the data for the full target object from
2401 * the parent so we can use it for a copyup to the target.
2404 return rbd_img_obj_parent_read_full(obj_request);
2406 /* We don't know whether the target exists. Go find out. */
2408 return rbd_img_obj_exists_submit(obj_request);
2411 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2413 struct rbd_obj_request *obj_request;
2414 struct rbd_obj_request *next_obj_request;
2416 dout("%s: img %p\n", __func__, img_request);
2417 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2420 ret = rbd_img_obj_request_submit(obj_request);
2428 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2430 struct rbd_obj_request *obj_request;
2431 struct rbd_device *rbd_dev;
2434 rbd_assert(img_request_child_test(img_request));
2436 obj_request = img_request->obj_request;
2437 rbd_assert(obj_request);
2438 rbd_assert(obj_request->img_request);
2440 obj_request->result = img_request->result;
2441 if (obj_request->result)
2445 * We need to zero anything beyond the parent overlap
2446 * boundary. Since rbd_img_obj_request_read_callback()
2447 * will zero anything beyond the end of a short read, an
2448 * easy way to do this is to pretend the data from the
2449 * parent came up short--ending at the overlap boundary.
2451 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2452 obj_end = obj_request->img_offset + obj_request->length;
2453 rbd_dev = obj_request->img_request->rbd_dev;
2454 if (obj_end > rbd_dev->parent_overlap) {
2457 if (obj_request->img_offset < rbd_dev->parent_overlap)
2458 xferred = rbd_dev->parent_overlap -
2459 obj_request->img_offset;
2461 obj_request->xferred = min(img_request->xferred, xferred);
2463 obj_request->xferred = img_request->xferred;
2466 rbd_img_obj_request_read_callback(obj_request);
2467 rbd_obj_request_complete(obj_request);
2470 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2472 struct rbd_device *rbd_dev;
2473 struct rbd_img_request *img_request;
2476 rbd_assert(obj_request_img_data_test(obj_request));
2477 rbd_assert(obj_request->img_request != NULL);
2478 rbd_assert(obj_request->result == (s32) -ENOENT);
2479 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2481 rbd_dev = obj_request->img_request->rbd_dev;
2482 rbd_assert(rbd_dev->parent != NULL);
2483 /* rbd_read_finish(obj_request, obj_request->length); */
2484 img_request = rbd_img_request_create(rbd_dev->parent,
2485 obj_request->img_offset,
2486 obj_request->length,
2492 rbd_obj_request_get(obj_request);
2493 img_request->obj_request = obj_request;
2495 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2496 obj_request->bio_list);
2500 img_request->callback = rbd_img_parent_read_callback;
2501 result = rbd_img_request_submit(img_request);
2508 rbd_img_request_put(img_request);
2509 obj_request->result = result;
2510 obj_request->xferred = 0;
2511 obj_request_done_set(obj_request);
2514 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2515 u64 ver, u64 notify_id)
2517 struct rbd_obj_request *obj_request;
2518 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2521 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2522 OBJ_REQUEST_NODATA);
2527 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2528 if (!obj_request->osd_req)
2530 obj_request->callback = rbd_obj_request_put;
2532 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2534 rbd_osd_req_format_read(obj_request);
2536 ret = rbd_obj_request_submit(osdc, obj_request);
2539 rbd_obj_request_put(obj_request);
2544 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2546 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2552 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2553 rbd_dev->header_name, (unsigned long long) notify_id,
2554 (unsigned int) opcode);
2555 (void)rbd_dev_refresh(rbd_dev, &hver);
2557 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2561 * Request sync osd watch/unwatch. The value of "start" determines
2562 * whether a watch request is being initiated or torn down.
2564 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2566 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2567 struct rbd_obj_request *obj_request;
2570 rbd_assert(start ^ !!rbd_dev->watch_event);
2571 rbd_assert(start ^ !!rbd_dev->watch_request);
2574 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2575 &rbd_dev->watch_event);
2578 rbd_assert(rbd_dev->watch_event != NULL);
2582 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2583 OBJ_REQUEST_NODATA);
2587 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2588 if (!obj_request->osd_req)
2592 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2594 ceph_osdc_unregister_linger_request(osdc,
2595 rbd_dev->watch_request->osd_req);
2597 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2598 rbd_dev->watch_event->cookie,
2599 rbd_dev->header.obj_version, start);
2600 rbd_osd_req_format_write(obj_request);
2602 ret = rbd_obj_request_submit(osdc, obj_request);
2605 ret = rbd_obj_request_wait(obj_request);
2608 ret = obj_request->result;
2613 * A watch request is set to linger, so the underlying osd
2614 * request won't go away until we unregister it. We retain
2615 * a pointer to the object request during that time (in
2616 * rbd_dev->watch_request), so we'll keep a reference to
2617 * it. We'll drop that reference (below) after we've
2621 rbd_dev->watch_request = obj_request;
2626 /* We have successfully torn down the watch request */
2628 rbd_obj_request_put(rbd_dev->watch_request);
2629 rbd_dev->watch_request = NULL;
2631 /* Cancel the event if we're tearing down, or on error */
2632 ceph_osdc_cancel_event(rbd_dev->watch_event);
2633 rbd_dev->watch_event = NULL;
2635 rbd_obj_request_put(obj_request);
2641 * Synchronous osd object method call. Returns the number of bytes
2642 * returned in the outbound buffer, or a negative error code.
2644 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2645 const char *object_name,
2646 const char *class_name,
2647 const char *method_name,
2648 const void *outbound,
2649 size_t outbound_size,
2651 size_t inbound_size,
2654 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2655 struct rbd_obj_request *obj_request;
2656 struct page **pages;
2661 * Method calls are ultimately read operations. The result
2662 * should placed into the inbound buffer provided. They
2663 * also supply outbound data--parameters for the object
2664 * method. Currently if this is present it will be a
2667 page_count = (u32)calc_pages_for(0, inbound_size);
2668 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2670 return PTR_ERR(pages);
2673 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2678 obj_request->pages = pages;
2679 obj_request->page_count = page_count;
2681 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2682 if (!obj_request->osd_req)
2685 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2686 class_name, method_name);
2687 if (outbound_size) {
2688 struct ceph_pagelist *pagelist;
2690 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2694 ceph_pagelist_init(pagelist);
2695 ceph_pagelist_append(pagelist, outbound, outbound_size);
2696 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2699 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2700 obj_request->pages, inbound_size,
2702 rbd_osd_req_format_read(obj_request);
2704 ret = rbd_obj_request_submit(osdc, obj_request);
2707 ret = rbd_obj_request_wait(obj_request);
2711 ret = obj_request->result;
2715 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2716 ret = (int)obj_request->xferred;
2717 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2719 *version = obj_request->version;
2722 rbd_obj_request_put(obj_request);
2724 ceph_release_page_vector(pages, page_count);
2729 static void rbd_request_fn(struct request_queue *q)
2730 __releases(q->queue_lock) __acquires(q->queue_lock)
2732 struct rbd_device *rbd_dev = q->queuedata;
2733 bool read_only = rbd_dev->mapping.read_only;
2737 while ((rq = blk_fetch_request(q))) {
2738 bool write_request = rq_data_dir(rq) == WRITE;
2739 struct rbd_img_request *img_request;
2743 /* Ignore any non-FS requests that filter through. */
2745 if (rq->cmd_type != REQ_TYPE_FS) {
2746 dout("%s: non-fs request type %d\n", __func__,
2747 (int) rq->cmd_type);
2748 __blk_end_request_all(rq, 0);
2752 /* Ignore/skip any zero-length requests */
2754 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2755 length = (u64) blk_rq_bytes(rq);
2758 dout("%s: zero-length request\n", __func__);
2759 __blk_end_request_all(rq, 0);
2763 spin_unlock_irq(q->queue_lock);
2765 /* Disallow writes to a read-only device */
2767 if (write_request) {
2771 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2775 * Quit early if the mapped snapshot no longer
2776 * exists. It's still possible the snapshot will
2777 * have disappeared by the time our request arrives
2778 * at the osd, but there's no sense in sending it if
2781 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2782 dout("request for non-existent snapshot");
2783 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2789 if (offset && length > U64_MAX - offset + 1) {
2790 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2792 goto end_request; /* Shouldn't happen */
2796 img_request = rbd_img_request_create(rbd_dev, offset, length,
2797 write_request, false);
2801 img_request->rq = rq;
2803 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2806 result = rbd_img_request_submit(img_request);
2808 rbd_img_request_put(img_request);
2810 spin_lock_irq(q->queue_lock);
2812 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2813 write_request ? "write" : "read",
2814 length, offset, result);
2816 __blk_end_request_all(rq, result);
2822 * a queue callback. Makes sure that we don't create a bio that spans across
2823 * multiple osd objects. One exception would be with a single page bios,
2824 * which we handle later at bio_chain_clone_range()
2826 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2827 struct bio_vec *bvec)
2829 struct rbd_device *rbd_dev = q->queuedata;
2830 sector_t sector_offset;
2831 sector_t sectors_per_obj;
2832 sector_t obj_sector_offset;
2836 * Find how far into its rbd object the partition-relative
2837 * bio start sector is to offset relative to the enclosing
2840 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2841 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2842 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2845 * Compute the number of bytes from that offset to the end
2846 * of the object. Account for what's already used by the bio.
2848 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2849 if (ret > bmd->bi_size)
2850 ret -= bmd->bi_size;
2855 * Don't send back more than was asked for. And if the bio
2856 * was empty, let the whole thing through because: "Note
2857 * that a block device *must* allow a single page to be
2858 * added to an empty bio."
2860 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2861 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2862 ret = (int) bvec->bv_len;
2867 static void rbd_free_disk(struct rbd_device *rbd_dev)
2869 struct gendisk *disk = rbd_dev->disk;
2874 rbd_dev->disk = NULL;
2875 if (disk->flags & GENHD_FL_UP) {
2878 blk_cleanup_queue(disk->queue);
2883 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2884 const char *object_name,
2885 u64 offset, u64 length,
2886 void *buf, u64 *version)
2889 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2890 struct rbd_obj_request *obj_request;
2891 struct page **pages = NULL;
2896 page_count = (u32) calc_pages_for(offset, length);
2897 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2899 ret = PTR_ERR(pages);
2902 obj_request = rbd_obj_request_create(object_name, offset, length,
2907 obj_request->pages = pages;
2908 obj_request->page_count = page_count;
2910 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2911 if (!obj_request->osd_req)
2914 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2915 offset, length, 0, 0);
2916 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2918 obj_request->length,
2919 obj_request->offset & ~PAGE_MASK,
2921 rbd_osd_req_format_read(obj_request);
2923 ret = rbd_obj_request_submit(osdc, obj_request);
2926 ret = rbd_obj_request_wait(obj_request);
2930 ret = obj_request->result;
2934 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2935 size = (size_t) obj_request->xferred;
2936 ceph_copy_from_page_vector(pages, buf, 0, size);
2937 rbd_assert(size <= (size_t) INT_MAX);
2940 *version = obj_request->version;
2943 rbd_obj_request_put(obj_request);
2945 ceph_release_page_vector(pages, page_count);
2951 * Read the complete header for the given rbd device.
2953 * Returns a pointer to a dynamically-allocated buffer containing
2954 * the complete and validated header. Caller can pass the address
2955 * of a variable that will be filled in with the version of the
2956 * header object at the time it was read.
2958 * Returns a pointer-coded errno if a failure occurs.
2960 static struct rbd_image_header_ondisk *
2961 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2963 struct rbd_image_header_ondisk *ondisk = NULL;
2970 * The complete header will include an array of its 64-bit
2971 * snapshot ids, followed by the names of those snapshots as
2972 * a contiguous block of NUL-terminated strings. Note that
2973 * the number of snapshots could change by the time we read
2974 * it in, in which case we re-read it.
2981 size = sizeof (*ondisk);
2982 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2984 ondisk = kmalloc(size, GFP_KERNEL);
2986 return ERR_PTR(-ENOMEM);
2988 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2989 0, size, ondisk, version);
2992 if ((size_t)ret < size) {
2994 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2998 if (!rbd_dev_ondisk_valid(ondisk)) {
3000 rbd_warn(rbd_dev, "invalid header");
3004 names_size = le64_to_cpu(ondisk->snap_names_len);
3005 want_count = snap_count;
3006 snap_count = le32_to_cpu(ondisk->snap_count);
3007 } while (snap_count != want_count);
3014 return ERR_PTR(ret);
3018 * reload the ondisk the header
3020 static int rbd_read_header(struct rbd_device *rbd_dev,
3021 struct rbd_image_header *header)
3023 struct rbd_image_header_ondisk *ondisk;
3027 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3029 return PTR_ERR(ondisk);
3030 ret = rbd_header_from_disk(header, ondisk);
3032 header->obj_version = ver;
3038 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3040 struct rbd_snap *snap;
3041 struct rbd_snap *next;
3043 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3044 list_del(&snap->node);
3045 rbd_snap_destroy(snap);
3049 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3051 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3054 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3057 rbd_dev->mapping.size = rbd_dev->header.image_size;
3058 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3059 dout("setting size to %llu sectors", (unsigned long long)size);
3060 set_capacity(rbd_dev->disk, size);
3065 * only read the first part of the ondisk header, without the snaps info
3067 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3070 struct rbd_image_header h;
3072 ret = rbd_read_header(rbd_dev, &h);
3076 down_write(&rbd_dev->header_rwsem);
3078 /* Update image size, and check for resize of mapped image */
3079 rbd_dev->header.image_size = h.image_size;
3080 rbd_update_mapping_size(rbd_dev);
3082 /* rbd_dev->header.object_prefix shouldn't change */
3083 kfree(rbd_dev->header.snap_sizes);
3084 kfree(rbd_dev->header.snap_names);
3085 /* osd requests may still refer to snapc */
3086 rbd_snap_context_put(rbd_dev->header.snapc);
3089 *hver = h.obj_version;
3090 rbd_dev->header.obj_version = h.obj_version;
3091 rbd_dev->header.image_size = h.image_size;
3092 rbd_dev->header.snapc = h.snapc;
3093 rbd_dev->header.snap_names = h.snap_names;
3094 rbd_dev->header.snap_sizes = h.snap_sizes;
3095 /* Free the extra copy of the object prefix */
3096 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3097 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3098 kfree(h.object_prefix);
3100 ret = rbd_dev_snaps_update(rbd_dev);
3102 up_write(&rbd_dev->header_rwsem);
3107 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3111 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3112 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3113 if (rbd_dev->image_format == 1)
3114 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3116 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3117 mutex_unlock(&ctl_mutex);
3118 revalidate_disk(rbd_dev->disk);
3120 rbd_warn(rbd_dev, "got notification but failed to "
3121 " update snaps: %d\n", ret);
3126 static int rbd_init_disk(struct rbd_device *rbd_dev)
3128 struct gendisk *disk;
3129 struct request_queue *q;
3132 /* create gendisk info */
3133 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3137 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3139 disk->major = rbd_dev->major;
3140 disk->first_minor = 0;
3141 disk->fops = &rbd_bd_ops;
3142 disk->private_data = rbd_dev;
3144 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3148 /* We use the default size, but let's be explicit about it. */
3149 blk_queue_physical_block_size(q, SECTOR_SIZE);
3151 /* set io sizes to object size */
3152 segment_size = rbd_obj_bytes(&rbd_dev->header);
3153 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3154 blk_queue_max_segment_size(q, segment_size);
3155 blk_queue_io_min(q, segment_size);
3156 blk_queue_io_opt(q, segment_size);
3158 blk_queue_merge_bvec(q, rbd_merge_bvec);
3161 q->queuedata = rbd_dev;
3163 rbd_dev->disk = disk;
3176 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3178 return container_of(dev, struct rbd_device, dev);
3181 static ssize_t rbd_size_show(struct device *dev,
3182 struct device_attribute *attr, char *buf)
3184 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3186 return sprintf(buf, "%llu\n",
3187 (unsigned long long)rbd_dev->mapping.size);
3191 * Note this shows the features for whatever's mapped, which is not
3192 * necessarily the base image.
3194 static ssize_t rbd_features_show(struct device *dev,
3195 struct device_attribute *attr, char *buf)
3197 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3199 return sprintf(buf, "0x%016llx\n",
3200 (unsigned long long)rbd_dev->mapping.features);
3203 static ssize_t rbd_major_show(struct device *dev,
3204 struct device_attribute *attr, char *buf)
3206 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3209 return sprintf(buf, "%d\n", rbd_dev->major);
3211 return sprintf(buf, "(none)\n");
3215 static ssize_t rbd_client_id_show(struct device *dev,
3216 struct device_attribute *attr, char *buf)
3218 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3220 return sprintf(buf, "client%lld\n",
3221 ceph_client_id(rbd_dev->rbd_client->client));
3224 static ssize_t rbd_pool_show(struct device *dev,
3225 struct device_attribute *attr, char *buf)
3227 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3229 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3232 static ssize_t rbd_pool_id_show(struct device *dev,
3233 struct device_attribute *attr, char *buf)
3235 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3237 return sprintf(buf, "%llu\n",
3238 (unsigned long long) rbd_dev->spec->pool_id);
3241 static ssize_t rbd_name_show(struct device *dev,
3242 struct device_attribute *attr, char *buf)
3244 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3246 if (rbd_dev->spec->image_name)
3247 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3249 return sprintf(buf, "(unknown)\n");
3252 static ssize_t rbd_image_id_show(struct device *dev,
3253 struct device_attribute *attr, char *buf)
3255 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3257 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3261 * Shows the name of the currently-mapped snapshot (or
3262 * RBD_SNAP_HEAD_NAME for the base image).
3264 static ssize_t rbd_snap_show(struct device *dev,
3265 struct device_attribute *attr,
3268 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3274 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3275 * for the parent image. If there is no parent, simply shows
3276 * "(no parent image)".
3278 static ssize_t rbd_parent_show(struct device *dev,
3279 struct device_attribute *attr,
3282 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3283 struct rbd_spec *spec = rbd_dev->parent_spec;
3288 return sprintf(buf, "(no parent image)\n");
3290 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3291 (unsigned long long) spec->pool_id, spec->pool_name);
3296 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3297 spec->image_name ? spec->image_name : "(unknown)");
3302 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3303 (unsigned long long) spec->snap_id, spec->snap_name);
3308 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3313 return (ssize_t) (bufp - buf);
3316 static ssize_t rbd_image_refresh(struct device *dev,
3317 struct device_attribute *attr,
3321 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3324 ret = rbd_dev_refresh(rbd_dev, NULL);
3326 return ret < 0 ? ret : size;
3329 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3330 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3331 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3332 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3333 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3334 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3335 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3336 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3337 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3338 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3339 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3341 static struct attribute *rbd_attrs[] = {
3342 &dev_attr_size.attr,
3343 &dev_attr_features.attr,
3344 &dev_attr_major.attr,
3345 &dev_attr_client_id.attr,
3346 &dev_attr_pool.attr,
3347 &dev_attr_pool_id.attr,
3348 &dev_attr_name.attr,
3349 &dev_attr_image_id.attr,
3350 &dev_attr_current_snap.attr,
3351 &dev_attr_parent.attr,
3352 &dev_attr_refresh.attr,
3356 static struct attribute_group rbd_attr_group = {
3360 static const struct attribute_group *rbd_attr_groups[] = {
3365 static void rbd_sysfs_dev_release(struct device *dev)
3369 static struct device_type rbd_device_type = {
3371 .groups = rbd_attr_groups,
3372 .release = rbd_sysfs_dev_release,
3375 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3377 kref_get(&spec->kref);
3382 static void rbd_spec_free(struct kref *kref);
3383 static void rbd_spec_put(struct rbd_spec *spec)
3386 kref_put(&spec->kref, rbd_spec_free);
3389 static struct rbd_spec *rbd_spec_alloc(void)
3391 struct rbd_spec *spec;
3393 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3396 kref_init(&spec->kref);
3401 static void rbd_spec_free(struct kref *kref)
3403 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3405 kfree(spec->pool_name);
3406 kfree(spec->image_id);
3407 kfree(spec->image_name);
3408 kfree(spec->snap_name);
3412 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3413 struct rbd_spec *spec)
3415 struct rbd_device *rbd_dev;
3417 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3421 spin_lock_init(&rbd_dev->lock);
3423 INIT_LIST_HEAD(&rbd_dev->node);
3424 INIT_LIST_HEAD(&rbd_dev->snaps);
3425 init_rwsem(&rbd_dev->header_rwsem);
3427 rbd_dev->spec = spec;
3428 rbd_dev->rbd_client = rbdc;
3430 /* Initialize the layout used for all rbd requests */
3432 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3433 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3434 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3435 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3440 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3442 rbd_put_client(rbd_dev->rbd_client);
3443 rbd_spec_put(rbd_dev->spec);
3447 static void rbd_snap_destroy(struct rbd_snap *snap)
3453 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3454 const char *snap_name,
3455 u64 snap_id, u64 snap_size,
3458 struct rbd_snap *snap;
3460 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3462 return ERR_PTR(-ENOMEM);
3464 snap->name = snap_name;
3466 snap->size = snap_size;
3467 snap->features = snap_features;
3473 * Returns a dynamically-allocated snapshot name if successful, or a
3474 * pointer-coded error otherwise.
3476 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3477 u64 *snap_size, u64 *snap_features)
3482 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3484 /* Skip over names until we find the one we are looking for */
3486 snap_name = rbd_dev->header.snap_names;
3487 for (i = 0; i < which; i++)
3488 snap_name += strlen(snap_name) + 1;
3490 snap_name = kstrdup(snap_name, GFP_KERNEL);
3492 return ERR_PTR(-ENOMEM);
3494 *snap_size = rbd_dev->header.snap_sizes[which];
3495 *snap_features = 0; /* No features for v1 */
3501 * Get the size and object order for an image snapshot, or if
3502 * snap_id is CEPH_NOSNAP, gets this information for the base
3505 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3506 u8 *order, u64 *snap_size)
3508 __le64 snapid = cpu_to_le64(snap_id);
3513 } __attribute__ ((packed)) size_buf = { 0 };
3515 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3517 &snapid, sizeof (snapid),
3518 &size_buf, sizeof (size_buf), NULL);
3519 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3522 if (ret < sizeof (size_buf))
3526 *order = size_buf.order;
3527 *snap_size = le64_to_cpu(size_buf.size);
3529 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3530 (unsigned long long)snap_id, (unsigned int)*order,
3531 (unsigned long long)*snap_size);
3536 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3538 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3539 &rbd_dev->header.obj_order,
3540 &rbd_dev->header.image_size);
3543 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3549 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3553 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3554 "rbd", "get_object_prefix", NULL, 0,
3555 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3556 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3561 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3562 p + ret, NULL, GFP_NOIO);
3565 if (IS_ERR(rbd_dev->header.object_prefix)) {
3566 ret = PTR_ERR(rbd_dev->header.object_prefix);
3567 rbd_dev->header.object_prefix = NULL;
3569 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3577 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3580 __le64 snapid = cpu_to_le64(snap_id);
3584 } __attribute__ ((packed)) features_buf = { 0 };
3588 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3589 "rbd", "get_features",
3590 &snapid, sizeof (snapid),
3591 &features_buf, sizeof (features_buf), NULL);
3592 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3595 if (ret < sizeof (features_buf))
3598 incompat = le64_to_cpu(features_buf.incompat);
3599 if (incompat & ~RBD_FEATURES_SUPPORTED)
3602 *snap_features = le64_to_cpu(features_buf.features);
3604 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3605 (unsigned long long)snap_id,
3606 (unsigned long long)*snap_features,
3607 (unsigned long long)le64_to_cpu(features_buf.incompat));
3612 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3614 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3615 &rbd_dev->header.features);
3618 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3620 struct rbd_spec *parent_spec;
3622 void *reply_buf = NULL;
3630 parent_spec = rbd_spec_alloc();
3634 size = sizeof (__le64) + /* pool_id */
3635 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3636 sizeof (__le64) + /* snap_id */
3637 sizeof (__le64); /* overlap */
3638 reply_buf = kmalloc(size, GFP_KERNEL);
3644 snapid = cpu_to_le64(CEPH_NOSNAP);
3645 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3646 "rbd", "get_parent",
3647 &snapid, sizeof (snapid),
3648 reply_buf, size, NULL);
3649 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3654 end = reply_buf + ret;
3656 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3657 if (parent_spec->pool_id == CEPH_NOPOOL)
3658 goto out; /* No parent? No problem. */
3660 /* The ceph file layout needs to fit pool id in 32 bits */
3663 if (parent_spec->pool_id > (u64)U32_MAX) {
3664 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3665 (unsigned long long)parent_spec->pool_id, U32_MAX);
3669 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3670 if (IS_ERR(image_id)) {
3671 ret = PTR_ERR(image_id);
3674 parent_spec->image_id = image_id;
3675 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3676 ceph_decode_64_safe(&p, end, overlap, out_err);
3678 rbd_dev->parent_overlap = overlap;
3679 rbd_dev->parent_spec = parent_spec;
3680 parent_spec = NULL; /* rbd_dev now owns this */
3685 rbd_spec_put(parent_spec);
3690 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3694 __le64 stripe_count;
3695 } __attribute__ ((packed)) striping_info_buf = { 0 };
3696 size_t size = sizeof (striping_info_buf);
3703 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3704 "rbd", "get_stripe_unit_count", NULL, 0,
3705 (char *)&striping_info_buf, size, NULL);
3706 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3713 * We don't actually support the "fancy striping" feature
3714 * (STRIPINGV2) yet, but if the striping sizes are the
3715 * defaults the behavior is the same as before. So find
3716 * out, and only fail if the image has non-default values.
3719 obj_size = (u64)1 << rbd_dev->header.obj_order;
3720 p = &striping_info_buf;
3721 stripe_unit = ceph_decode_64(&p);
3722 if (stripe_unit != obj_size) {
3723 rbd_warn(rbd_dev, "unsupported stripe unit "
3724 "(got %llu want %llu)",
3725 stripe_unit, obj_size);
3728 stripe_count = ceph_decode_64(&p);
3729 if (stripe_count != 1) {
3730 rbd_warn(rbd_dev, "unsupported stripe count "
3731 "(got %llu want 1)", stripe_count);
3734 rbd_dev->header.stripe_unit = stripe_unit;
3735 rbd_dev->header.stripe_count = stripe_count;
3740 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3742 size_t image_id_size;
3747 void *reply_buf = NULL;
3749 char *image_name = NULL;
3752 rbd_assert(!rbd_dev->spec->image_name);
3754 len = strlen(rbd_dev->spec->image_id);
3755 image_id_size = sizeof (__le32) + len;
3756 image_id = kmalloc(image_id_size, GFP_KERNEL);
3761 end = image_id + image_id_size;
3762 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3764 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3765 reply_buf = kmalloc(size, GFP_KERNEL);
3769 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3770 "rbd", "dir_get_name",
3771 image_id, image_id_size,
3772 reply_buf, size, NULL);
3776 end = reply_buf + ret;
3778 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3779 if (IS_ERR(image_name))
3782 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3791 * When an rbd image has a parent image, it is identified by the
3792 * pool, image, and snapshot ids (not names). This function fills
3793 * in the names for those ids. (It's OK if we can't figure out the
3794 * name for an image id, but the pool and snapshot ids should always
3795 * exist and have names.) All names in an rbd spec are dynamically
3798 * When an image being mapped (not a parent) is probed, we have the
3799 * pool name and pool id, image name and image id, and the snapshot
3800 * name. The only thing we're missing is the snapshot id.
3802 * The set of snapshots for an image is not known until they have
3803 * been read by rbd_dev_snaps_update(), so we can't completely fill
3804 * in this information until after that has been called.
3806 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3808 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3809 struct rbd_spec *spec = rbd_dev->spec;
3810 const char *pool_name;
3811 const char *image_name;
3812 const char *snap_name;
3816 * An image being mapped will have the pool name (etc.), but
3817 * we need to look up the snapshot id.
3819 if (spec->pool_name) {
3820 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3821 struct rbd_snap *snap;
3823 snap = snap_by_name(rbd_dev, spec->snap_name);
3826 spec->snap_id = snap->id;
3828 spec->snap_id = CEPH_NOSNAP;
3834 /* Get the pool name; we have to make our own copy of this */
3836 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3838 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3841 pool_name = kstrdup(pool_name, GFP_KERNEL);
3845 /* Fetch the image name; tolerate failure here */
3847 image_name = rbd_dev_image_name(rbd_dev);
3849 rbd_warn(rbd_dev, "unable to get image name");
3851 /* Look up the snapshot name, and make a copy */
3853 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3855 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3859 snap_name = kstrdup(snap_name, GFP_KERNEL);
3865 spec->pool_name = pool_name;
3866 spec->image_name = image_name;
3867 spec->snap_name = snap_name;
3877 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3886 struct ceph_snap_context *snapc;
3890 * We'll need room for the seq value (maximum snapshot id),
3891 * snapshot count, and array of that many snapshot ids.
3892 * For now we have a fixed upper limit on the number we're
3893 * prepared to receive.
3895 size = sizeof (__le64) + sizeof (__le32) +
3896 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3897 reply_buf = kzalloc(size, GFP_KERNEL);
3901 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3902 "rbd", "get_snapcontext", NULL, 0,
3903 reply_buf, size, ver);
3904 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3909 end = reply_buf + ret;
3911 ceph_decode_64_safe(&p, end, seq, out);
3912 ceph_decode_32_safe(&p, end, snap_count, out);
3915 * Make sure the reported number of snapshot ids wouldn't go
3916 * beyond the end of our buffer. But before checking that,
3917 * make sure the computed size of the snapshot context we
3918 * allocate is representable in a size_t.
3920 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3925 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3929 snapc = rbd_snap_context_create(snap_count);
3935 for (i = 0; i < snap_count; i++)
3936 snapc->snaps[i] = ceph_decode_64(&p);
3938 rbd_dev->header.snapc = snapc;
3940 dout(" snap context seq = %llu, snap_count = %u\n",
3941 (unsigned long long)seq, (unsigned int)snap_count);
3948 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3958 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3959 reply_buf = kmalloc(size, GFP_KERNEL);
3961 return ERR_PTR(-ENOMEM);
3963 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3964 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3965 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3966 "rbd", "get_snapshot_name",
3967 &snap_id, sizeof (snap_id),
3968 reply_buf, size, NULL);
3969 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3971 snap_name = ERR_PTR(ret);
3976 end = reply_buf + ret;
3977 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3978 if (IS_ERR(snap_name))
3981 dout(" snap_id 0x%016llx snap_name = %s\n",
3982 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3989 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3990 u64 *snap_size, u64 *snap_features)
3998 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3999 snap_id = rbd_dev->header.snapc->snaps[which];
4000 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
4004 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
4008 snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
4009 if (!IS_ERR(snap_name)) {
4011 *snap_features = features;
4016 return ERR_PTR(ret);
4019 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4020 u64 *snap_size, u64 *snap_features)
4022 if (rbd_dev->image_format == 1)
4023 return rbd_dev_v1_snap_info(rbd_dev, which,
4024 snap_size, snap_features);
4025 if (rbd_dev->image_format == 2)
4026 return rbd_dev_v2_snap_info(rbd_dev, which,
4027 snap_size, snap_features);
4028 return ERR_PTR(-EINVAL);
4031 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4035 down_write(&rbd_dev->header_rwsem);
4037 ret = rbd_dev_v2_image_size(rbd_dev);
4040 rbd_update_mapping_size(rbd_dev);
4042 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4043 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4046 ret = rbd_dev_snaps_update(rbd_dev);
4047 dout("rbd_dev_snaps_update returned %d\n", ret);
4051 up_write(&rbd_dev->header_rwsem);
4057 * Scan the rbd device's current snapshot list and compare it to the
4058 * newly-received snapshot context. Remove any existing snapshots
4059 * not present in the new snapshot context. Add a new snapshot for
4060 * any snaphots in the snapshot context not in the current list.
4061 * And verify there are no changes to snapshots we already know
4064 * Assumes the snapshots in the snapshot context are sorted by
4065 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4066 * are also maintained in that order.)
4068 * Note that any error occurs while updating the snapshot list
4069 * aborts the update, and the entire list is cleared. The snapshot
4070 * list becomes inconsistent at that point anyway, so it might as
4073 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4075 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4076 const u32 snap_count = snapc->num_snaps;
4077 struct list_head *head = &rbd_dev->snaps;
4078 struct list_head *links = head->next;
4082 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4083 while (index < snap_count || links != head) {
4085 struct rbd_snap *snap;
4088 u64 snap_features = 0;
4090 snap_id = index < snap_count ? snapc->snaps[index]
4092 snap = links != head ? list_entry(links, struct rbd_snap, node)
4094 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4096 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4097 struct list_head *next = links->next;
4100 * A previously-existing snapshot is not in
4101 * the new snap context.
4103 * If the now-missing snapshot is the one
4104 * the image represents, clear its existence
4105 * flag so we can avoid sending any more
4108 if (rbd_dev->spec->snap_id == snap->id)
4109 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4110 dout("removing %ssnap id %llu\n",
4111 rbd_dev->spec->snap_id == snap->id ?
4113 (unsigned long long)snap->id);
4115 list_del(&snap->node);
4116 rbd_snap_destroy(snap);
4118 /* Done with this list entry; advance */
4124 snap_name = rbd_dev_snap_info(rbd_dev, index,
4125 &snap_size, &snap_features);
4126 if (IS_ERR(snap_name)) {
4127 ret = PTR_ERR(snap_name);
4128 dout("failed to get snap info, error %d\n", ret);
4132 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4133 (unsigned long long)snap_id);
4134 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4135 struct rbd_snap *new_snap;
4137 /* We haven't seen this snapshot before */
4139 new_snap = rbd_snap_create(rbd_dev, snap_name,
4140 snap_id, snap_size, snap_features);
4141 if (IS_ERR(new_snap)) {
4142 ret = PTR_ERR(new_snap);
4143 dout(" failed to add dev, error %d\n", ret);
4147 /* New goes before existing, or at end of list */
4149 dout(" added dev%s\n", snap ? "" : " at end\n");
4151 list_add_tail(&new_snap->node, &snap->node);
4153 list_add_tail(&new_snap->node, head);
4155 /* Already have this one */
4157 dout(" already present\n");
4159 rbd_assert(snap->size == snap_size);
4160 rbd_assert(!strcmp(snap->name, snap_name));
4161 rbd_assert(snap->features == snap_features);
4163 /* Done with this list entry; advance */
4165 links = links->next;
4168 /* Advance to the next entry in the snapshot context */
4172 dout("%s: done\n", __func__);
4176 rbd_remove_all_snaps(rbd_dev);
4181 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4186 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4188 dev = &rbd_dev->dev;
4189 dev->bus = &rbd_bus_type;
4190 dev->type = &rbd_device_type;
4191 dev->parent = &rbd_root_dev;
4192 dev->release = rbd_dev_device_release;
4193 dev_set_name(dev, "%d", rbd_dev->dev_id);
4194 ret = device_register(dev);
4196 mutex_unlock(&ctl_mutex);
4201 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4203 device_unregister(&rbd_dev->dev);
4206 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4209 * Get a unique rbd identifier for the given new rbd_dev, and add
4210 * the rbd_dev to the global list. The minimum rbd id is 1.
4212 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4214 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4216 spin_lock(&rbd_dev_list_lock);
4217 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4218 spin_unlock(&rbd_dev_list_lock);
4219 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4220 (unsigned long long) rbd_dev->dev_id);
4224 * Remove an rbd_dev from the global list, and record that its
4225 * identifier is no longer in use.
4227 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4229 struct list_head *tmp;
4230 int rbd_id = rbd_dev->dev_id;
4233 rbd_assert(rbd_id > 0);
4235 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4236 (unsigned long long) rbd_dev->dev_id);
4237 spin_lock(&rbd_dev_list_lock);
4238 list_del_init(&rbd_dev->node);
4241 * If the id being "put" is not the current maximum, there
4242 * is nothing special we need to do.
4244 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4245 spin_unlock(&rbd_dev_list_lock);
4250 * We need to update the current maximum id. Search the
4251 * list to find out what it is. We're more likely to find
4252 * the maximum at the end, so search the list backward.
4255 list_for_each_prev(tmp, &rbd_dev_list) {
4256 struct rbd_device *rbd_dev;
4258 rbd_dev = list_entry(tmp, struct rbd_device, node);
4259 if (rbd_dev->dev_id > max_id)
4260 max_id = rbd_dev->dev_id;
4262 spin_unlock(&rbd_dev_list_lock);
4265 * The max id could have been updated by rbd_dev_id_get(), in
4266 * which case it now accurately reflects the new maximum.
4267 * Be careful not to overwrite the maximum value in that
4270 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4271 dout(" max dev id has been reset\n");
4275 * Skips over white space at *buf, and updates *buf to point to the
4276 * first found non-space character (if any). Returns the length of
4277 * the token (string of non-white space characters) found. Note
4278 * that *buf must be terminated with '\0'.
4280 static inline size_t next_token(const char **buf)
4283 * These are the characters that produce nonzero for
4284 * isspace() in the "C" and "POSIX" locales.
4286 const char *spaces = " \f\n\r\t\v";
4288 *buf += strspn(*buf, spaces); /* Find start of token */
4290 return strcspn(*buf, spaces); /* Return token length */
4294 * Finds the next token in *buf, and if the provided token buffer is
4295 * big enough, copies the found token into it. The result, if
4296 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4297 * must be terminated with '\0' on entry.
4299 * Returns the length of the token found (not including the '\0').
4300 * Return value will be 0 if no token is found, and it will be >=
4301 * token_size if the token would not fit.
4303 * The *buf pointer will be updated to point beyond the end of the
4304 * found token. Note that this occurs even if the token buffer is
4305 * too small to hold it.
4307 static inline size_t copy_token(const char **buf,
4313 len = next_token(buf);
4314 if (len < token_size) {
4315 memcpy(token, *buf, len);
4316 *(token + len) = '\0';
4324 * Finds the next token in *buf, dynamically allocates a buffer big
4325 * enough to hold a copy of it, and copies the token into the new
4326 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4327 * that a duplicate buffer is created even for a zero-length token.
4329 * Returns a pointer to the newly-allocated duplicate, or a null
4330 * pointer if memory for the duplicate was not available. If
4331 * the lenp argument is a non-null pointer, the length of the token
4332 * (not including the '\0') is returned in *lenp.
4334 * If successful, the *buf pointer will be updated to point beyond
4335 * the end of the found token.
4337 * Note: uses GFP_KERNEL for allocation.
4339 static inline char *dup_token(const char **buf, size_t *lenp)
4344 len = next_token(buf);
4345 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4348 *(dup + len) = '\0';
4358 * Parse the options provided for an "rbd add" (i.e., rbd image
4359 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4360 * and the data written is passed here via a NUL-terminated buffer.
4361 * Returns 0 if successful or an error code otherwise.
4363 * The information extracted from these options is recorded in
4364 * the other parameters which return dynamically-allocated
4367 * The address of a pointer that will refer to a ceph options
4368 * structure. Caller must release the returned pointer using
4369 * ceph_destroy_options() when it is no longer needed.
4371 * Address of an rbd options pointer. Fully initialized by
4372 * this function; caller must release with kfree().
4374 * Address of an rbd image specification pointer. Fully
4375 * initialized by this function based on parsed options.
4376 * Caller must release with rbd_spec_put().
4378 * The options passed take this form:
4379 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4382 * A comma-separated list of one or more monitor addresses.
4383 * A monitor address is an ip address, optionally followed
4384 * by a port number (separated by a colon).
4385 * I.e.: ip1[:port1][,ip2[:port2]...]
4387 * A comma-separated list of ceph and/or rbd options.
4389 * The name of the rados pool containing the rbd image.
4391 * The name of the image in that pool to map.
4393 * An optional snapshot id. If provided, the mapping will
4394 * present data from the image at the time that snapshot was
4395 * created. The image head is used if no snapshot id is
4396 * provided. Snapshot mappings are always read-only.
4398 static int rbd_add_parse_args(const char *buf,
4399 struct ceph_options **ceph_opts,
4400 struct rbd_options **opts,
4401 struct rbd_spec **rbd_spec)
4405 const char *mon_addrs;
4407 size_t mon_addrs_size;
4408 struct rbd_spec *spec = NULL;
4409 struct rbd_options *rbd_opts = NULL;
4410 struct ceph_options *copts;
4413 /* The first four tokens are required */
4415 len = next_token(&buf);
4417 rbd_warn(NULL, "no monitor address(es) provided");
4421 mon_addrs_size = len + 1;
4425 options = dup_token(&buf, NULL);
4429 rbd_warn(NULL, "no options provided");
4433 spec = rbd_spec_alloc();
4437 spec->pool_name = dup_token(&buf, NULL);
4438 if (!spec->pool_name)
4440 if (!*spec->pool_name) {
4441 rbd_warn(NULL, "no pool name provided");
4445 spec->image_name = dup_token(&buf, NULL);
4446 if (!spec->image_name)
4448 if (!*spec->image_name) {
4449 rbd_warn(NULL, "no image name provided");
4454 * Snapshot name is optional; default is to use "-"
4455 * (indicating the head/no snapshot).
4457 len = next_token(&buf);
4459 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4460 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4461 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4462 ret = -ENAMETOOLONG;
4465 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4468 *(snap_name + len) = '\0';
4469 spec->snap_name = snap_name;
4471 /* Initialize all rbd options to the defaults */
4473 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4477 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4479 copts = ceph_parse_options(options, mon_addrs,
4480 mon_addrs + mon_addrs_size - 1,
4481 parse_rbd_opts_token, rbd_opts);
4482 if (IS_ERR(copts)) {
4483 ret = PTR_ERR(copts);
4504 * An rbd format 2 image has a unique identifier, distinct from the
4505 * name given to it by the user. Internally, that identifier is
4506 * what's used to specify the names of objects related to the image.
4508 * A special "rbd id" object is used to map an rbd image name to its
4509 * id. If that object doesn't exist, then there is no v2 rbd image
4510 * with the supplied name.
4512 * This function will record the given rbd_dev's image_id field if
4513 * it can be determined, and in that case will return 0. If any
4514 * errors occur a negative errno will be returned and the rbd_dev's
4515 * image_id field will be unchanged (and should be NULL).
4517 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4526 * When probing a parent image, the image id is already
4527 * known (and the image name likely is not). There's no
4528 * need to fetch the image id again in this case. We
4529 * do still need to set the image format though.
4531 if (rbd_dev->spec->image_id) {
4532 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4538 * First, see if the format 2 image id file exists, and if
4539 * so, get the image's persistent id from it.
4541 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4542 object_name = kmalloc(size, GFP_NOIO);
4545 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4546 dout("rbd id object name is %s\n", object_name);
4548 /* Response will be an encoded string, which includes a length */
4550 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4551 response = kzalloc(size, GFP_NOIO);
4557 /* If it doesn't exist we'll assume it's a format 1 image */
4559 ret = rbd_obj_method_sync(rbd_dev, object_name,
4560 "rbd", "get_id", NULL, 0,
4561 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4562 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4563 if (ret == -ENOENT) {
4564 image_id = kstrdup("", GFP_KERNEL);
4565 ret = image_id ? 0 : -ENOMEM;
4567 rbd_dev->image_format = 1;
4568 } else if (ret > sizeof (__le32)) {
4571 image_id = ceph_extract_encoded_string(&p, p + ret,
4573 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4575 rbd_dev->image_format = 2;
4581 rbd_dev->spec->image_id = image_id;
4582 dout("image_id is %s\n", image_id);
4591 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4595 /* Populate rbd image metadata */
4597 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4601 /* Version 1 images have no parent (no layering) */
4603 rbd_dev->parent_spec = NULL;
4604 rbd_dev->parent_overlap = 0;
4606 dout("discovered version 1 image, header name is %s\n",
4607 rbd_dev->header_name);
4612 kfree(rbd_dev->header_name);
4613 rbd_dev->header_name = NULL;
4614 kfree(rbd_dev->spec->image_id);
4615 rbd_dev->spec->image_id = NULL;
4620 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4625 ret = rbd_dev_v2_image_size(rbd_dev);
4629 /* Get the object prefix (a.k.a. block_name) for the image */
4631 ret = rbd_dev_v2_object_prefix(rbd_dev);
4635 /* Get the and check features for the image */
4637 ret = rbd_dev_v2_features(rbd_dev);
4641 /* If the image supports layering, get the parent info */
4643 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4644 ret = rbd_dev_v2_parent_info(rbd_dev);
4647 rbd_warn(rbd_dev, "WARNING: kernel support for "
4648 "layered rbd images is EXPERIMENTAL!");
4651 /* If the image supports fancy striping, get its parameters */
4653 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4654 ret = rbd_dev_v2_striping_info(rbd_dev);
4659 /* crypto and compression type aren't (yet) supported for v2 images */
4661 rbd_dev->header.crypt_type = 0;
4662 rbd_dev->header.comp_type = 0;
4664 /* Get the snapshot context, plus the header version */
4666 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4669 rbd_dev->header.obj_version = ver;
4671 dout("discovered version 2 image, header name is %s\n",
4672 rbd_dev->header_name);
4676 rbd_dev->parent_overlap = 0;
4677 rbd_spec_put(rbd_dev->parent_spec);
4678 rbd_dev->parent_spec = NULL;
4679 kfree(rbd_dev->header_name);
4680 rbd_dev->header_name = NULL;
4681 kfree(rbd_dev->header.object_prefix);
4682 rbd_dev->header.object_prefix = NULL;
4687 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4689 struct rbd_device *parent = NULL;
4690 struct rbd_spec *parent_spec;
4691 struct rbd_client *rbdc;
4694 if (!rbd_dev->parent_spec)
4697 * We need to pass a reference to the client and the parent
4698 * spec when creating the parent rbd_dev. Images related by
4699 * parent/child relationships always share both.
4701 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4702 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4705 parent = rbd_dev_create(rbdc, parent_spec);
4709 ret = rbd_dev_image_probe(parent);
4712 rbd_dev->parent = parent;
4717 rbd_spec_put(rbd_dev->parent_spec);
4718 kfree(rbd_dev->header_name);
4719 rbd_dev_destroy(parent);
4721 rbd_put_client(rbdc);
4722 rbd_spec_put(parent_spec);
4728 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4732 ret = rbd_dev_mapping_set(rbd_dev);
4736 /* generate unique id: find highest unique id, add one */
4737 rbd_dev_id_get(rbd_dev);
4739 /* Fill in the device name, now that we have its id. */
4740 BUILD_BUG_ON(DEV_NAME_LEN
4741 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4742 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4744 /* Get our block major device number. */
4746 ret = register_blkdev(0, rbd_dev->name);
4749 rbd_dev->major = ret;
4751 /* Set up the blkdev mapping. */
4753 ret = rbd_init_disk(rbd_dev);
4755 goto err_out_blkdev;
4757 ret = rbd_bus_add_dev(rbd_dev);
4761 /* Everything's ready. Announce the disk to the world. */
4763 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4764 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4765 add_disk(rbd_dev->disk);
4767 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4768 (unsigned long long) rbd_dev->mapping.size);
4773 rbd_free_disk(rbd_dev);
4775 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4777 rbd_dev_id_put(rbd_dev);
4778 rbd_dev_mapping_clear(rbd_dev);
4783 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4785 struct rbd_spec *spec = rbd_dev->spec;
4788 /* Record the header object name for this rbd image. */
4790 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4792 if (rbd_dev->image_format == 1)
4793 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4795 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4797 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4798 if (!rbd_dev->header_name)
4801 if (rbd_dev->image_format == 1)
4802 sprintf(rbd_dev->header_name, "%s%s",
4803 spec->image_name, RBD_SUFFIX);
4805 sprintf(rbd_dev->header_name, "%s%s",
4806 RBD_HEADER_PREFIX, spec->image_id);
4810 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4812 rbd_header_free(&rbd_dev->header);
4813 rbd_assert(rbd_dev->rbd_client != NULL);
4814 rbd_spec_put(rbd_dev->parent_spec);
4815 kfree(rbd_dev->header_name);
4816 rbd_dev_destroy(rbd_dev);
4820 * Probe for the existence of the header object for the given rbd
4821 * device. For format 2 images this includes determining the image
4824 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4830 * Get the id from the image id object. If it's not a
4831 * format 2 image, we'll get ENOENT back, and we'll assume
4832 * it's a format 1 image.
4834 ret = rbd_dev_image_id(rbd_dev);
4837 rbd_assert(rbd_dev->spec->image_id);
4838 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4840 ret = rbd_dev_header_name(rbd_dev);
4842 goto err_out_format;
4844 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4846 goto out_header_name;
4848 if (rbd_dev->image_format == 1)
4849 ret = rbd_dev_v1_probe(rbd_dev);
4851 ret = rbd_dev_v2_probe(rbd_dev);
4855 ret = rbd_dev_snaps_update(rbd_dev);
4859 ret = rbd_dev_spec_update(rbd_dev);
4863 ret = rbd_dev_probe_parent(rbd_dev);
4867 ret = rbd_dev_device_setup(rbd_dev);
4869 goto err_out_parent;
4873 rbd_dev_remove_parent(rbd_dev);
4874 rbd_header_free(&rbd_dev->header);
4876 rbd_remove_all_snaps(rbd_dev);
4878 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4880 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4882 kfree(rbd_dev->header_name);
4883 rbd_dev->header_name = NULL;
4885 rbd_dev->image_format = 0;
4886 kfree(rbd_dev->spec->image_id);
4887 rbd_dev->spec->image_id = NULL;
4889 dout("probe failed, returning %d\n", ret);
4894 static ssize_t rbd_add(struct bus_type *bus,
4898 struct rbd_device *rbd_dev = NULL;
4899 struct ceph_options *ceph_opts = NULL;
4900 struct rbd_options *rbd_opts = NULL;
4901 struct rbd_spec *spec = NULL;
4902 struct rbd_client *rbdc;
4903 struct ceph_osd_client *osdc;
4906 if (!try_module_get(THIS_MODULE))
4909 /* parse add command */
4910 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4912 goto err_out_module;
4914 rbdc = rbd_get_client(ceph_opts);
4919 ceph_opts = NULL; /* rbd_dev client now owns this */
4922 osdc = &rbdc->client->osdc;
4923 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4925 goto err_out_client;
4926 spec->pool_id = (u64)rc;
4928 /* The ceph file layout needs to fit pool id in 32 bits */
4930 if (spec->pool_id > (u64)U32_MAX) {
4931 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4932 (unsigned long long)spec->pool_id, U32_MAX);
4934 goto err_out_client;
4937 rbd_dev = rbd_dev_create(rbdc, spec);
4939 goto err_out_client;
4940 rbdc = NULL; /* rbd_dev now owns this */
4941 spec = NULL; /* rbd_dev now owns this */
4943 rbd_dev->mapping.read_only = rbd_opts->read_only;
4945 rbd_opts = NULL; /* done with this */
4947 rc = rbd_dev_image_probe(rbd_dev);
4949 goto err_out_rbd_dev;
4953 kfree(rbd_dev->header_name);
4954 rbd_dev_destroy(rbd_dev);
4956 rbd_put_client(rbdc);
4959 ceph_destroy_options(ceph_opts);
4963 module_put(THIS_MODULE);
4965 dout("Error adding device %s\n", buf);
4970 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4972 struct list_head *tmp;
4973 struct rbd_device *rbd_dev;
4975 spin_lock(&rbd_dev_list_lock);
4976 list_for_each(tmp, &rbd_dev_list) {
4977 rbd_dev = list_entry(tmp, struct rbd_device, node);
4978 if (rbd_dev->dev_id == dev_id) {
4979 spin_unlock(&rbd_dev_list_lock);
4983 spin_unlock(&rbd_dev_list_lock);
4987 static void rbd_dev_device_release(struct device *dev)
4989 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4991 rbd_free_disk(rbd_dev);
4992 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4993 rbd_dev_clear_mapping(rbd_dev);
4994 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4996 rbd_dev_id_put(rbd_dev);
4997 rbd_dev_mapping_clear(rbd_dev);
4999 rbd_dev_image_release(rbd_dev);
5002 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5004 while (rbd_dev->parent) {
5005 struct rbd_device *first = rbd_dev;
5006 struct rbd_device *second = first->parent;
5007 struct rbd_device *third;
5011 * Follow to the parent with no grandparent and
5014 while (second && (third = second->parent)) {
5019 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
5022 "failed to cancel watch event (%d)\n", ret);
5023 rbd_remove_all_snaps(second);
5024 rbd_bus_del_dev(second);
5025 first->parent = NULL;
5026 first->parent_overlap = 0;
5028 rbd_assert(first->parent_spec);
5029 rbd_spec_put(first->parent_spec);
5030 first->parent_spec = NULL;
5034 static ssize_t rbd_remove(struct bus_type *bus,
5038 struct rbd_device *rbd_dev = NULL;
5043 ret = strict_strtoul(buf, 10, &ul);
5047 /* convert to int; abort if we lost anything in the conversion */
5048 target_id = (int) ul;
5049 if (target_id != ul)
5052 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5054 rbd_dev = __rbd_get_dev(target_id);
5060 spin_lock_irq(&rbd_dev->lock);
5061 if (rbd_dev->open_count)
5064 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5065 spin_unlock_irq(&rbd_dev->lock);
5069 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
5071 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5072 clear_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5078 rbd_dev_remove_parent(rbd_dev);
5080 rbd_remove_all_snaps(rbd_dev);
5081 rbd_bus_del_dev(rbd_dev);
5082 module_put(THIS_MODULE);
5084 mutex_unlock(&ctl_mutex);
5090 * create control files in sysfs
5093 static int rbd_sysfs_init(void)
5097 ret = device_register(&rbd_root_dev);
5101 ret = bus_register(&rbd_bus_type);
5103 device_unregister(&rbd_root_dev);
5108 static void rbd_sysfs_cleanup(void)
5110 bus_unregister(&rbd_bus_type);
5111 device_unregister(&rbd_root_dev);
5114 static int __init rbd_init(void)
5118 if (!libceph_compatible(NULL)) {
5119 rbd_warn(NULL, "libceph incompatibility (quitting)");
5123 rc = rbd_sysfs_init();
5126 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5130 static void __exit rbd_exit(void)
5132 rbd_sysfs_cleanup();
5135 module_init(rbd_init);
5136 module_exit(rbd_exit);
5138 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5139 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5140 MODULE_DESCRIPTION("rados block device");
5142 /* following authorship retained from original osdblk.c */
5143 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5145 MODULE_LICENSE("GPL");