3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
37 #include <linux/kernel.h>
38 #include <linux/device.h>
39 #include <linux/module.h>
41 #include <linux/blkdev.h>
43 #include "rbd_types.h"
45 #define RBD_DEBUG /* Activate rbd_assert() calls */
48 * The basic unit of block I/O is a sector. It is interpreted in a
49 * number of contexts in Linux (blk, bio, genhd), but the default is
50 * universally 512 bytes. These symbols are just slightly more
51 * meaningful than the bare numbers they represent.
53 #define SECTOR_SHIFT 9
54 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
56 #define RBD_DRV_NAME "rbd"
57 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
59 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
61 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
62 #define RBD_MAX_SNAP_NAME_LEN \
63 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
65 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
67 #define RBD_SNAP_HEAD_NAME "-"
69 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
71 /* This allows a single page to hold an image name sent by OSD */
72 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
73 #define RBD_IMAGE_ID_LEN_MAX 64
75 #define RBD_OBJ_PREFIX_LEN_MAX 64
79 #define RBD_FEATURE_LAYERING (1<<0)
80 #define RBD_FEATURE_STRIPINGV2 (1<<1)
81 #define RBD_FEATURES_ALL \
82 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
84 /* Features supported by this (client software) implementation. */
86 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
89 * An RBD device name will be "rbd#", where the "rbd" comes from
90 * RBD_DRV_NAME above, and # is a unique integer identifier.
91 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
92 * enough to hold all possible device names.
94 #define DEV_NAME_LEN 32
95 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
98 * block device image metadata (in-memory version)
100 struct rbd_image_header {
101 /* These four fields never change for a given rbd image */
108 /* The remaining fields need to be updated occasionally */
110 struct ceph_snap_context *snapc;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
145 const char *pool_name;
147 const char *image_id;
148 const char *image_name;
151 const char *snap_name;
157 * an instance of the client. multiple devices may share an rbd client.
160 struct ceph_client *client;
162 struct list_head node;
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173 enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
178 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
179 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
180 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
181 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
184 struct rbd_obj_request {
185 const char *object_name;
186 u64 offset; /* object start byte */
187 u64 length; /* bytes from offset */
191 * An object request associated with an image will have its
192 * img_data flag set; a standalone object request will not.
194 * A standalone object request will have which == BAD_WHICH
195 * and a null obj_request pointer.
197 * An object request initiated in support of a layered image
198 * object (to check for its existence before a write) will
199 * have which == BAD_WHICH and a non-null obj_request pointer.
201 * Finally, an object request for rbd image data will have
202 * which != BAD_WHICH, and will have a non-null img_request
203 * pointer. The value of which will be in the range
204 * 0..(img_request->obj_request_count-1).
207 struct rbd_obj_request *obj_request; /* STAT op */
209 struct rbd_img_request *img_request;
211 /* links for img_request->obj_requests list */
212 struct list_head links;
215 u32 which; /* posn image request list */
217 enum obj_request_type type;
219 struct bio *bio_list;
225 struct page **copyup_pages;
227 struct ceph_osd_request *osd_req;
229 u64 xferred; /* bytes transferred */
232 rbd_obj_callback_t callback;
233 struct completion completion;
239 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
240 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
241 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
244 struct rbd_img_request {
245 struct rbd_device *rbd_dev;
246 u64 offset; /* starting image byte offset */
247 u64 length; /* byte count from offset */
250 u64 snap_id; /* for reads */
251 struct ceph_snap_context *snapc; /* for writes */
254 struct request *rq; /* block request */
255 struct rbd_obj_request *obj_request; /* obj req initiator */
257 struct page **copyup_pages;
258 spinlock_t completion_lock;/* protects next_completion */
260 rbd_img_callback_t callback;
261 u64 xferred;/* aggregate bytes transferred */
262 int result; /* first nonzero obj_request result */
264 u32 obj_request_count;
265 struct list_head obj_requests; /* rbd_obj_request structs */
270 #define for_each_obj_request(ireq, oreq) \
271 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
280 struct list_head node;
295 int dev_id; /* blkdev unique id */
297 int major; /* blkdev assigned major */
298 struct gendisk *disk; /* blkdev's gendisk and rq */
300 u32 image_format; /* Either 1 or 2 */
301 struct rbd_client *rbd_client;
303 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
305 spinlock_t lock; /* queue, flags, open_count */
307 struct rbd_image_header header;
308 unsigned long flags; /* possibly lock protected */
309 struct rbd_spec *spec;
313 struct ceph_file_layout layout;
315 struct ceph_osd_event *watch_event;
316 struct rbd_obj_request *watch_request;
318 struct rbd_spec *parent_spec;
320 struct rbd_device *parent;
322 /* protects updating the header */
323 struct rw_semaphore header_rwsem;
325 struct rbd_mapping mapping;
327 struct list_head node;
329 /* list of snapshots */
330 struct list_head snaps;
334 unsigned long open_count; /* protected by lock */
338 * Flag bits for rbd_dev->flags. If atomicity is required,
339 * rbd_dev->lock is used to protect access.
341 * Currently, only the "removing" flag (which is coupled with the
342 * "open_count" field) requires atomic access.
345 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
346 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
349 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
351 static LIST_HEAD(rbd_dev_list); /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
354 static LIST_HEAD(rbd_client_list); /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
370 static struct bus_attribute rbd_bus_attrs[] = {
371 __ATTR(add, S_IWUSR, NULL, rbd_add),
372 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
376 static struct bus_type rbd_bus_type = {
378 .bus_attrs = rbd_bus_attrs,
381 static void rbd_root_dev_release(struct device *dev)
385 static struct device rbd_root_dev = {
387 .release = rbd_root_dev_release,
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
393 struct va_format vaf;
401 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402 else if (rbd_dev->disk)
403 printk(KERN_WARNING "%s: %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405 else if (rbd_dev->spec && rbd_dev->spec->image_name)
406 printk(KERN_WARNING "%s: image %s: %pV\n",
407 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408 else if (rbd_dev->spec && rbd_dev->spec->image_id)
409 printk(KERN_WARNING "%s: id %s: %pV\n",
410 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
412 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413 RBD_DRV_NAME, rbd_dev, &vaf);
418 #define rbd_assert(expr) \
419 if (unlikely(!(expr))) { \
420 printk(KERN_ERR "\nAssertion failure in %s() " \
422 "\trbd_assert(%s);\n\n", \
423 __func__, __LINE__, #expr); \
426 #else /* !RBD_DEBUG */
427 # define rbd_assert(expr) ((void) 0)
428 #endif /* !RBD_DEBUG */
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
436 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
438 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
439 u8 *order, u64 *snap_size);
440 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
442 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
444 static int rbd_open(struct block_device *bdev, fmode_t mode)
446 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
447 bool removing = false;
449 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
452 spin_lock_irq(&rbd_dev->lock);
453 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
456 rbd_dev->open_count++;
457 spin_unlock_irq(&rbd_dev->lock);
461 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
462 (void) get_device(&rbd_dev->dev);
463 set_device_ro(bdev, rbd_dev->mapping.read_only);
464 mutex_unlock(&ctl_mutex);
469 static int rbd_release(struct gendisk *disk, fmode_t mode)
471 struct rbd_device *rbd_dev = disk->private_data;
472 unsigned long open_count_before;
474 spin_lock_irq(&rbd_dev->lock);
475 open_count_before = rbd_dev->open_count--;
476 spin_unlock_irq(&rbd_dev->lock);
477 rbd_assert(open_count_before > 0);
479 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
480 put_device(&rbd_dev->dev);
481 mutex_unlock(&ctl_mutex);
486 static const struct block_device_operations rbd_bd_ops = {
487 .owner = THIS_MODULE,
489 .release = rbd_release,
493 * Initialize an rbd client instance.
496 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
498 struct rbd_client *rbdc;
501 dout("%s:\n", __func__);
502 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
506 kref_init(&rbdc->kref);
507 INIT_LIST_HEAD(&rbdc->node);
509 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
511 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
512 if (IS_ERR(rbdc->client))
514 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
516 ret = ceph_open_session(rbdc->client);
520 spin_lock(&rbd_client_list_lock);
521 list_add_tail(&rbdc->node, &rbd_client_list);
522 spin_unlock(&rbd_client_list_lock);
524 mutex_unlock(&ctl_mutex);
525 dout("%s: rbdc %p\n", __func__, rbdc);
530 ceph_destroy_client(rbdc->client);
532 mutex_unlock(&ctl_mutex);
536 ceph_destroy_options(ceph_opts);
537 dout("%s: error %d\n", __func__, ret);
542 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
544 kref_get(&rbdc->kref);
550 * Find a ceph client with specific addr and configuration. If
551 * found, bump its reference count.
553 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
555 struct rbd_client *client_node;
558 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
561 spin_lock(&rbd_client_list_lock);
562 list_for_each_entry(client_node, &rbd_client_list, node) {
563 if (!ceph_compare_options(ceph_opts, client_node->client)) {
564 __rbd_get_client(client_node);
570 spin_unlock(&rbd_client_list_lock);
572 return found ? client_node : NULL;
582 /* string args above */
585 /* Boolean args above */
589 static match_table_t rbd_opts_tokens = {
591 /* string args above */
592 {Opt_read_only, "read_only"},
593 {Opt_read_only, "ro"}, /* Alternate spelling */
594 {Opt_read_write, "read_write"},
595 {Opt_read_write, "rw"}, /* Alternate spelling */
596 /* Boolean args above */
604 #define RBD_READ_ONLY_DEFAULT false
606 static int parse_rbd_opts_token(char *c, void *private)
608 struct rbd_options *rbd_opts = private;
609 substring_t argstr[MAX_OPT_ARGS];
610 int token, intval, ret;
612 token = match_token(c, rbd_opts_tokens, argstr);
616 if (token < Opt_last_int) {
617 ret = match_int(&argstr[0], &intval);
619 pr_err("bad mount option arg (not int) "
623 dout("got int token %d val %d\n", token, intval);
624 } else if (token > Opt_last_int && token < Opt_last_string) {
625 dout("got string token %d val %s\n", token,
627 } else if (token > Opt_last_string && token < Opt_last_bool) {
628 dout("got Boolean token %d\n", token);
630 dout("got token %d\n", token);
635 rbd_opts->read_only = true;
638 rbd_opts->read_only = false;
648 * Get a ceph client with specific addr and configuration, if one does
649 * not exist create it.
651 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
653 struct rbd_client *rbdc;
655 rbdc = rbd_client_find(ceph_opts);
656 if (rbdc) /* using an existing client */
657 ceph_destroy_options(ceph_opts);
659 rbdc = rbd_client_create(ceph_opts);
665 * Destroy ceph client
667 * Caller must hold rbd_client_list_lock.
669 static void rbd_client_release(struct kref *kref)
671 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
673 dout("%s: rbdc %p\n", __func__, rbdc);
674 spin_lock(&rbd_client_list_lock);
675 list_del(&rbdc->node);
676 spin_unlock(&rbd_client_list_lock);
678 ceph_destroy_client(rbdc->client);
683 * Drop reference to ceph client node. If it's not referenced anymore, release
686 static void rbd_put_client(struct rbd_client *rbdc)
689 kref_put(&rbdc->kref, rbd_client_release);
692 static bool rbd_image_format_valid(u32 image_format)
694 return image_format == 1 || image_format == 2;
697 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
702 /* The header has to start with the magic rbd header text */
703 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
706 /* The bio layer requires at least sector-sized I/O */
708 if (ondisk->options.order < SECTOR_SHIFT)
711 /* If we use u64 in a few spots we may be able to loosen this */
713 if (ondisk->options.order > 8 * sizeof (int) - 1)
717 * The size of a snapshot header has to fit in a size_t, and
718 * that limits the number of snapshots.
720 snap_count = le32_to_cpu(ondisk->snap_count);
721 size = SIZE_MAX - sizeof (struct ceph_snap_context);
722 if (snap_count > size / sizeof (__le64))
726 * Not only that, but the size of the entire the snapshot
727 * header must also be representable in a size_t.
729 size -= snap_count * sizeof (__le64);
730 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
737 * Create a new header structure, translate header format from the on-disk
740 static int rbd_header_from_disk(struct rbd_image_header *header,
741 struct rbd_image_header_ondisk *ondisk)
748 memset(header, 0, sizeof (*header));
750 snap_count = le32_to_cpu(ondisk->snap_count);
752 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
753 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
754 if (!header->object_prefix)
756 memcpy(header->object_prefix, ondisk->object_prefix, len);
757 header->object_prefix[len] = '\0';
760 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
762 /* Save a copy of the snapshot names */
764 if (snap_names_len > (u64) SIZE_MAX)
766 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
767 if (!header->snap_names)
770 * Note that rbd_dev_v1_header_read() guarantees
771 * the ondisk buffer we're working with has
772 * snap_names_len bytes beyond the end of the
773 * snapshot id array, this memcpy() is safe.
775 memcpy(header->snap_names, &ondisk->snaps[snap_count],
778 /* Record each snapshot's size */
780 size = snap_count * sizeof (*header->snap_sizes);
781 header->snap_sizes = kmalloc(size, GFP_KERNEL);
782 if (!header->snap_sizes)
784 for (i = 0; i < snap_count; i++)
785 header->snap_sizes[i] =
786 le64_to_cpu(ondisk->snaps[i].image_size);
788 header->snap_names = NULL;
789 header->snap_sizes = NULL;
792 header->features = 0; /* No features support in v1 images */
793 header->obj_order = ondisk->options.order;
794 header->crypt_type = ondisk->options.crypt_type;
795 header->comp_type = ondisk->options.comp_type;
797 /* Allocate and fill in the snapshot context */
799 header->image_size = le64_to_cpu(ondisk->image_size);
801 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
804 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
805 for (i = 0; i < snap_count; i++)
806 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
811 kfree(header->snap_sizes);
812 header->snap_sizes = NULL;
813 kfree(header->snap_names);
814 header->snap_names = NULL;
815 kfree(header->object_prefix);
816 header->object_prefix = NULL;
821 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
823 const char *snap_name;
825 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
827 /* Skip over names until we find the one we are looking for */
829 snap_name = rbd_dev->header.snap_names;
831 snap_name += strlen(snap_name) + 1;
833 return kstrdup(snap_name, GFP_KERNEL);
836 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
838 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
841 for (which = 0; which < snapc->num_snaps; which++)
842 if (snapc->snaps[which] == snap_id)
845 return BAD_SNAP_INDEX;
848 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
853 which = rbd_dev_snap_index(rbd_dev, snap_id);
854 if (which == BAD_SNAP_INDEX)
857 return _rbd_dev_v1_snap_name(rbd_dev, which);
860 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
862 if (snap_id == CEPH_NOSNAP)
863 return RBD_SNAP_HEAD_NAME;
865 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
866 if (rbd_dev->image_format == 1)
867 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
869 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
872 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
875 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
876 if (snap_id == CEPH_NOSNAP) {
877 *snap_size = rbd_dev->header.image_size;
878 } else if (rbd_dev->image_format == 1) {
881 which = rbd_dev_snap_index(rbd_dev, snap_id);
882 if (which == BAD_SNAP_INDEX)
885 *snap_size = rbd_dev->header.snap_sizes[which];
890 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
899 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
902 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
903 if (snap_id == CEPH_NOSNAP) {
904 *snap_features = rbd_dev->header.features;
905 } else if (rbd_dev->image_format == 1) {
906 *snap_features = 0; /* No features for format 1 */
911 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
915 *snap_features = features;
920 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
922 const char *snap_name = rbd_dev->spec->snap_name;
928 if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
929 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
930 if (snap_id == CEPH_NOSNAP)
933 snap_id = CEPH_NOSNAP;
936 ret = rbd_snap_size(rbd_dev, snap_id, &size);
939 ret = rbd_snap_features(rbd_dev, snap_id, &features);
943 rbd_dev->mapping.size = size;
944 rbd_dev->mapping.features = features;
946 /* If we are mapping a snapshot it must be marked read-only */
948 if (snap_id != CEPH_NOSNAP)
949 rbd_dev->mapping.read_only = true;
954 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
956 rbd_dev->mapping.size = 0;
957 rbd_dev->mapping.features = 0;
958 rbd_dev->mapping.read_only = true;
961 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
963 rbd_dev->mapping.size = 0;
964 rbd_dev->mapping.features = 0;
965 rbd_dev->mapping.read_only = true;
968 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
974 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
977 segment = offset >> rbd_dev->header.obj_order;
978 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
979 rbd_dev->header.object_prefix, segment);
980 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
981 pr_err("error formatting segment name for #%llu (%d)\n",
990 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
992 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
994 return offset & (segment_size - 1);
997 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
998 u64 offset, u64 length)
1000 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1002 offset &= segment_size - 1;
1004 rbd_assert(length <= U64_MAX - offset);
1005 if (offset + length > segment_size)
1006 length = segment_size - offset;
1012 * returns the size of an object in the image
1014 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1016 return 1 << header->obj_order;
1023 static void bio_chain_put(struct bio *chain)
1029 chain = chain->bi_next;
1035 * zeros a bio chain, starting at specific offset
1037 static void zero_bio_chain(struct bio *chain, int start_ofs)
1040 unsigned long flags;
1046 bio_for_each_segment(bv, chain, i) {
1047 if (pos + bv->bv_len > start_ofs) {
1048 int remainder = max(start_ofs - pos, 0);
1049 buf = bvec_kmap_irq(bv, &flags);
1050 memset(buf + remainder, 0,
1051 bv->bv_len - remainder);
1052 bvec_kunmap_irq(buf, &flags);
1057 chain = chain->bi_next;
1062 * similar to zero_bio_chain(), zeros data defined by a page array,
1063 * starting at the given byte offset from the start of the array and
1064 * continuing up to the given end offset. The pages array is
1065 * assumed to be big enough to hold all bytes up to the end.
1067 static void zero_pages(struct page **pages, u64 offset, u64 end)
1069 struct page **page = &pages[offset >> PAGE_SHIFT];
1071 rbd_assert(end > offset);
1072 rbd_assert(end - offset <= (u64)SIZE_MAX);
1073 while (offset < end) {
1076 unsigned long flags;
1079 page_offset = (size_t)(offset & ~PAGE_MASK);
1080 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1081 local_irq_save(flags);
1082 kaddr = kmap_atomic(*page);
1083 memset(kaddr + page_offset, 0, length);
1084 kunmap_atomic(kaddr);
1085 local_irq_restore(flags);
1093 * Clone a portion of a bio, starting at the given byte offset
1094 * and continuing for the number of bytes indicated.
1096 static struct bio *bio_clone_range(struct bio *bio_src,
1097 unsigned int offset,
1105 unsigned short end_idx;
1106 unsigned short vcnt;
1109 /* Handle the easy case for the caller */
1111 if (!offset && len == bio_src->bi_size)
1112 return bio_clone(bio_src, gfpmask);
1114 if (WARN_ON_ONCE(!len))
1116 if (WARN_ON_ONCE(len > bio_src->bi_size))
1118 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1121 /* Find first affected segment... */
1124 __bio_for_each_segment(bv, bio_src, idx, 0) {
1125 if (resid < bv->bv_len)
1127 resid -= bv->bv_len;
1131 /* ...and the last affected segment */
1134 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1135 if (resid <= bv->bv_len)
1137 resid -= bv->bv_len;
1139 vcnt = end_idx - idx + 1;
1141 /* Build the clone */
1143 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1145 return NULL; /* ENOMEM */
1147 bio->bi_bdev = bio_src->bi_bdev;
1148 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1149 bio->bi_rw = bio_src->bi_rw;
1150 bio->bi_flags |= 1 << BIO_CLONED;
1153 * Copy over our part of the bio_vec, then update the first
1154 * and last (or only) entries.
1156 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1157 vcnt * sizeof (struct bio_vec));
1158 bio->bi_io_vec[0].bv_offset += voff;
1160 bio->bi_io_vec[0].bv_len -= voff;
1161 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1163 bio->bi_io_vec[0].bv_len = len;
1166 bio->bi_vcnt = vcnt;
1174 * Clone a portion of a bio chain, starting at the given byte offset
1175 * into the first bio in the source chain and continuing for the
1176 * number of bytes indicated. The result is another bio chain of
1177 * exactly the given length, or a null pointer on error.
1179 * The bio_src and offset parameters are both in-out. On entry they
1180 * refer to the first source bio and the offset into that bio where
1181 * the start of data to be cloned is located.
1183 * On return, bio_src is updated to refer to the bio in the source
1184 * chain that contains first un-cloned byte, and *offset will
1185 * contain the offset of that byte within that bio.
1187 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1188 unsigned int *offset,
1192 struct bio *bi = *bio_src;
1193 unsigned int off = *offset;
1194 struct bio *chain = NULL;
1197 /* Build up a chain of clone bios up to the limit */
1199 if (!bi || off >= bi->bi_size || !len)
1200 return NULL; /* Nothing to clone */
1204 unsigned int bi_size;
1208 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1209 goto out_err; /* EINVAL; ran out of bio's */
1211 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1212 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1214 goto out_err; /* ENOMEM */
1217 end = &bio->bi_next;
1220 if (off == bi->bi_size) {
1231 bio_chain_put(chain);
1237 * The default/initial value for all object request flags is 0. For
1238 * each flag, once its value is set to 1 it is never reset to 0
1241 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1243 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1244 struct rbd_device *rbd_dev;
1246 rbd_dev = obj_request->img_request->rbd_dev;
1247 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1252 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1255 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1258 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1260 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1261 struct rbd_device *rbd_dev = NULL;
1263 if (obj_request_img_data_test(obj_request))
1264 rbd_dev = obj_request->img_request->rbd_dev;
1265 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1270 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1273 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1277 * This sets the KNOWN flag after (possibly) setting the EXISTS
1278 * flag. The latter is set based on the "exists" value provided.
1280 * Note that for our purposes once an object exists it never goes
1281 * away again. It's possible that the response from two existence
1282 * checks are separated by the creation of the target object, and
1283 * the first ("doesn't exist") response arrives *after* the second
1284 * ("does exist"). In that case we ignore the second one.
1286 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1290 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1291 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1295 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1298 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1301 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1304 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1307 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1309 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1310 atomic_read(&obj_request->kref.refcount));
1311 kref_get(&obj_request->kref);
1314 static void rbd_obj_request_destroy(struct kref *kref);
1315 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1317 rbd_assert(obj_request != NULL);
1318 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1319 atomic_read(&obj_request->kref.refcount));
1320 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1323 static void rbd_img_request_get(struct rbd_img_request *img_request)
1325 dout("%s: img %p (was %d)\n", __func__, img_request,
1326 atomic_read(&img_request->kref.refcount));
1327 kref_get(&img_request->kref);
1330 static void rbd_img_request_destroy(struct kref *kref);
1331 static void rbd_img_request_put(struct rbd_img_request *img_request)
1333 rbd_assert(img_request != NULL);
1334 dout("%s: img %p (was %d)\n", __func__, img_request,
1335 atomic_read(&img_request->kref.refcount));
1336 kref_put(&img_request->kref, rbd_img_request_destroy);
1339 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1340 struct rbd_obj_request *obj_request)
1342 rbd_assert(obj_request->img_request == NULL);
1344 /* Image request now owns object's original reference */
1345 obj_request->img_request = img_request;
1346 obj_request->which = img_request->obj_request_count;
1347 rbd_assert(!obj_request_img_data_test(obj_request));
1348 obj_request_img_data_set(obj_request);
1349 rbd_assert(obj_request->which != BAD_WHICH);
1350 img_request->obj_request_count++;
1351 list_add_tail(&obj_request->links, &img_request->obj_requests);
1352 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1353 obj_request->which);
1356 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1357 struct rbd_obj_request *obj_request)
1359 rbd_assert(obj_request->which != BAD_WHICH);
1361 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1362 obj_request->which);
1363 list_del(&obj_request->links);
1364 rbd_assert(img_request->obj_request_count > 0);
1365 img_request->obj_request_count--;
1366 rbd_assert(obj_request->which == img_request->obj_request_count);
1367 obj_request->which = BAD_WHICH;
1368 rbd_assert(obj_request_img_data_test(obj_request));
1369 rbd_assert(obj_request->img_request == img_request);
1370 obj_request->img_request = NULL;
1371 obj_request->callback = NULL;
1372 rbd_obj_request_put(obj_request);
1375 static bool obj_request_type_valid(enum obj_request_type type)
1378 case OBJ_REQUEST_NODATA:
1379 case OBJ_REQUEST_BIO:
1380 case OBJ_REQUEST_PAGES:
1387 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1388 struct rbd_obj_request *obj_request)
1390 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1392 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1395 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1398 dout("%s: img %p\n", __func__, img_request);
1401 * If no error occurred, compute the aggregate transfer
1402 * count for the image request. We could instead use
1403 * atomic64_cmpxchg() to update it as each object request
1404 * completes; not clear which way is better off hand.
1406 if (!img_request->result) {
1407 struct rbd_obj_request *obj_request;
1410 for_each_obj_request(img_request, obj_request)
1411 xferred += obj_request->xferred;
1412 img_request->xferred = xferred;
1415 if (img_request->callback)
1416 img_request->callback(img_request);
1418 rbd_img_request_put(img_request);
1421 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1423 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1425 dout("%s: obj %p\n", __func__, obj_request);
1427 return wait_for_completion_interruptible(&obj_request->completion);
1431 * The default/initial value for all image request flags is 0. Each
1432 * is conditionally set to 1 at image request initialization time
1433 * and currently never change thereafter.
1435 static void img_request_write_set(struct rbd_img_request *img_request)
1437 set_bit(IMG_REQ_WRITE, &img_request->flags);
1441 static bool img_request_write_test(struct rbd_img_request *img_request)
1444 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1447 static void img_request_child_set(struct rbd_img_request *img_request)
1449 set_bit(IMG_REQ_CHILD, &img_request->flags);
1453 static bool img_request_child_test(struct rbd_img_request *img_request)
1456 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1459 static void img_request_layered_set(struct rbd_img_request *img_request)
1461 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1465 static bool img_request_layered_test(struct rbd_img_request *img_request)
1468 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1472 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1474 u64 xferred = obj_request->xferred;
1475 u64 length = obj_request->length;
1477 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1478 obj_request, obj_request->img_request, obj_request->result,
1481 * ENOENT means a hole in the image. We zero-fill the
1482 * entire length of the request. A short read also implies
1483 * zero-fill to the end of the request. Either way we
1484 * update the xferred count to indicate the whole request
1487 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1488 if (obj_request->result == -ENOENT) {
1489 if (obj_request->type == OBJ_REQUEST_BIO)
1490 zero_bio_chain(obj_request->bio_list, 0);
1492 zero_pages(obj_request->pages, 0, length);
1493 obj_request->result = 0;
1494 obj_request->xferred = length;
1495 } else if (xferred < length && !obj_request->result) {
1496 if (obj_request->type == OBJ_REQUEST_BIO)
1497 zero_bio_chain(obj_request->bio_list, xferred);
1499 zero_pages(obj_request->pages, xferred, length);
1500 obj_request->xferred = length;
1502 obj_request_done_set(obj_request);
1505 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1507 dout("%s: obj %p cb %p\n", __func__, obj_request,
1508 obj_request->callback);
1509 if (obj_request->callback)
1510 obj_request->callback(obj_request);
1512 complete_all(&obj_request->completion);
1515 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1517 dout("%s: obj %p\n", __func__, obj_request);
1518 obj_request_done_set(obj_request);
1521 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1523 struct rbd_img_request *img_request = NULL;
1524 struct rbd_device *rbd_dev = NULL;
1525 bool layered = false;
1527 if (obj_request_img_data_test(obj_request)) {
1528 img_request = obj_request->img_request;
1529 layered = img_request && img_request_layered_test(img_request);
1530 rbd_dev = img_request->rbd_dev;
1533 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1534 obj_request, img_request, obj_request->result,
1535 obj_request->xferred, obj_request->length);
1536 if (layered && obj_request->result == -ENOENT &&
1537 obj_request->img_offset < rbd_dev->parent_overlap)
1538 rbd_img_parent_read(obj_request);
1539 else if (img_request)
1540 rbd_img_obj_request_read_callback(obj_request);
1542 obj_request_done_set(obj_request);
1545 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1547 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1548 obj_request->result, obj_request->length);
1550 * There is no such thing as a successful short write. Set
1551 * it to our originally-requested length.
1553 obj_request->xferred = obj_request->length;
1554 obj_request_done_set(obj_request);
1558 * For a simple stat call there's nothing to do. We'll do more if
1559 * this is part of a write sequence for a layered image.
1561 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1563 dout("%s: obj %p\n", __func__, obj_request);
1564 obj_request_done_set(obj_request);
1567 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1568 struct ceph_msg *msg)
1570 struct rbd_obj_request *obj_request = osd_req->r_priv;
1573 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1574 rbd_assert(osd_req == obj_request->osd_req);
1575 if (obj_request_img_data_test(obj_request)) {
1576 rbd_assert(obj_request->img_request);
1577 rbd_assert(obj_request->which != BAD_WHICH);
1579 rbd_assert(obj_request->which == BAD_WHICH);
1582 if (osd_req->r_result < 0)
1583 obj_request->result = osd_req->r_result;
1585 BUG_ON(osd_req->r_num_ops > 2);
1588 * We support a 64-bit length, but ultimately it has to be
1589 * passed to blk_end_request(), which takes an unsigned int.
1591 obj_request->xferred = osd_req->r_reply_op_len[0];
1592 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1593 opcode = osd_req->r_ops[0].op;
1595 case CEPH_OSD_OP_READ:
1596 rbd_osd_read_callback(obj_request);
1598 case CEPH_OSD_OP_WRITE:
1599 rbd_osd_write_callback(obj_request);
1601 case CEPH_OSD_OP_STAT:
1602 rbd_osd_stat_callback(obj_request);
1604 case CEPH_OSD_OP_CALL:
1605 case CEPH_OSD_OP_NOTIFY_ACK:
1606 case CEPH_OSD_OP_WATCH:
1607 rbd_osd_trivial_callback(obj_request);
1610 rbd_warn(NULL, "%s: unsupported op %hu\n",
1611 obj_request->object_name, (unsigned short) opcode);
1615 if (obj_request_done_test(obj_request))
1616 rbd_obj_request_complete(obj_request);
1619 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1621 struct rbd_img_request *img_request = obj_request->img_request;
1622 struct ceph_osd_request *osd_req = obj_request->osd_req;
1625 rbd_assert(osd_req != NULL);
1627 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1628 ceph_osdc_build_request(osd_req, obj_request->offset,
1629 NULL, snap_id, NULL);
1632 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1634 struct rbd_img_request *img_request = obj_request->img_request;
1635 struct ceph_osd_request *osd_req = obj_request->osd_req;
1636 struct ceph_snap_context *snapc;
1637 struct timespec mtime = CURRENT_TIME;
1639 rbd_assert(osd_req != NULL);
1641 snapc = img_request ? img_request->snapc : NULL;
1642 ceph_osdc_build_request(osd_req, obj_request->offset,
1643 snapc, CEPH_NOSNAP, &mtime);
1646 static struct ceph_osd_request *rbd_osd_req_create(
1647 struct rbd_device *rbd_dev,
1649 struct rbd_obj_request *obj_request)
1651 struct ceph_snap_context *snapc = NULL;
1652 struct ceph_osd_client *osdc;
1653 struct ceph_osd_request *osd_req;
1655 if (obj_request_img_data_test(obj_request)) {
1656 struct rbd_img_request *img_request = obj_request->img_request;
1658 rbd_assert(write_request ==
1659 img_request_write_test(img_request));
1661 snapc = img_request->snapc;
1664 /* Allocate and initialize the request, for the single op */
1666 osdc = &rbd_dev->rbd_client->client->osdc;
1667 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1669 return NULL; /* ENOMEM */
1672 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1674 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1676 osd_req->r_callback = rbd_osd_req_callback;
1677 osd_req->r_priv = obj_request;
1679 osd_req->r_oid_len = strlen(obj_request->object_name);
1680 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1681 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1683 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1689 * Create a copyup osd request based on the information in the
1690 * object request supplied. A copyup request has two osd ops,
1691 * a copyup method call, and a "normal" write request.
1693 static struct ceph_osd_request *
1694 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1696 struct rbd_img_request *img_request;
1697 struct ceph_snap_context *snapc;
1698 struct rbd_device *rbd_dev;
1699 struct ceph_osd_client *osdc;
1700 struct ceph_osd_request *osd_req;
1702 rbd_assert(obj_request_img_data_test(obj_request));
1703 img_request = obj_request->img_request;
1704 rbd_assert(img_request);
1705 rbd_assert(img_request_write_test(img_request));
1707 /* Allocate and initialize the request, for the two ops */
1709 snapc = img_request->snapc;
1710 rbd_dev = img_request->rbd_dev;
1711 osdc = &rbd_dev->rbd_client->client->osdc;
1712 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1714 return NULL; /* ENOMEM */
1716 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1717 osd_req->r_callback = rbd_osd_req_callback;
1718 osd_req->r_priv = obj_request;
1720 osd_req->r_oid_len = strlen(obj_request->object_name);
1721 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1722 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1724 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1730 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1732 ceph_osdc_put_request(osd_req);
1735 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1737 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1738 u64 offset, u64 length,
1739 enum obj_request_type type)
1741 struct rbd_obj_request *obj_request;
1745 rbd_assert(obj_request_type_valid(type));
1747 size = strlen(object_name) + 1;
1748 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1752 name = (char *)(obj_request + 1);
1753 obj_request->object_name = memcpy(name, object_name, size);
1754 obj_request->offset = offset;
1755 obj_request->length = length;
1756 obj_request->flags = 0;
1757 obj_request->which = BAD_WHICH;
1758 obj_request->type = type;
1759 INIT_LIST_HEAD(&obj_request->links);
1760 init_completion(&obj_request->completion);
1761 kref_init(&obj_request->kref);
1763 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1764 offset, length, (int)type, obj_request);
1769 static void rbd_obj_request_destroy(struct kref *kref)
1771 struct rbd_obj_request *obj_request;
1773 obj_request = container_of(kref, struct rbd_obj_request, kref);
1775 dout("%s: obj %p\n", __func__, obj_request);
1777 rbd_assert(obj_request->img_request == NULL);
1778 rbd_assert(obj_request->which == BAD_WHICH);
1780 if (obj_request->osd_req)
1781 rbd_osd_req_destroy(obj_request->osd_req);
1783 rbd_assert(obj_request_type_valid(obj_request->type));
1784 switch (obj_request->type) {
1785 case OBJ_REQUEST_NODATA:
1786 break; /* Nothing to do */
1787 case OBJ_REQUEST_BIO:
1788 if (obj_request->bio_list)
1789 bio_chain_put(obj_request->bio_list);
1791 case OBJ_REQUEST_PAGES:
1792 if (obj_request->pages)
1793 ceph_release_page_vector(obj_request->pages,
1794 obj_request->page_count);
1802 * Caller is responsible for filling in the list of object requests
1803 * that comprises the image request, and the Linux request pointer
1804 * (if there is one).
1806 static struct rbd_img_request *rbd_img_request_create(
1807 struct rbd_device *rbd_dev,
1808 u64 offset, u64 length,
1812 struct rbd_img_request *img_request;
1814 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1818 if (write_request) {
1819 down_read(&rbd_dev->header_rwsem);
1820 ceph_get_snap_context(rbd_dev->header.snapc);
1821 up_read(&rbd_dev->header_rwsem);
1824 img_request->rq = NULL;
1825 img_request->rbd_dev = rbd_dev;
1826 img_request->offset = offset;
1827 img_request->length = length;
1828 img_request->flags = 0;
1829 if (write_request) {
1830 img_request_write_set(img_request);
1831 img_request->snapc = rbd_dev->header.snapc;
1833 img_request->snap_id = rbd_dev->spec->snap_id;
1836 img_request_child_set(img_request);
1837 if (rbd_dev->parent_spec)
1838 img_request_layered_set(img_request);
1839 spin_lock_init(&img_request->completion_lock);
1840 img_request->next_completion = 0;
1841 img_request->callback = NULL;
1842 img_request->result = 0;
1843 img_request->obj_request_count = 0;
1844 INIT_LIST_HEAD(&img_request->obj_requests);
1845 kref_init(&img_request->kref);
1847 rbd_img_request_get(img_request); /* Avoid a warning */
1848 rbd_img_request_put(img_request); /* TEMPORARY */
1850 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1851 write_request ? "write" : "read", offset, length,
1857 static void rbd_img_request_destroy(struct kref *kref)
1859 struct rbd_img_request *img_request;
1860 struct rbd_obj_request *obj_request;
1861 struct rbd_obj_request *next_obj_request;
1863 img_request = container_of(kref, struct rbd_img_request, kref);
1865 dout("%s: img %p\n", __func__, img_request);
1867 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1868 rbd_img_obj_request_del(img_request, obj_request);
1869 rbd_assert(img_request->obj_request_count == 0);
1871 if (img_request_write_test(img_request))
1872 ceph_put_snap_context(img_request->snapc);
1874 if (img_request_child_test(img_request))
1875 rbd_obj_request_put(img_request->obj_request);
1880 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1882 struct rbd_img_request *img_request;
1883 unsigned int xferred;
1887 rbd_assert(obj_request_img_data_test(obj_request));
1888 img_request = obj_request->img_request;
1890 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1891 xferred = (unsigned int)obj_request->xferred;
1892 result = obj_request->result;
1894 struct rbd_device *rbd_dev = img_request->rbd_dev;
1896 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1897 img_request_write_test(img_request) ? "write" : "read",
1898 obj_request->length, obj_request->img_offset,
1899 obj_request->offset);
1900 rbd_warn(rbd_dev, " result %d xferred %x\n",
1902 if (!img_request->result)
1903 img_request->result = result;
1906 /* Image object requests don't own their page array */
1908 if (obj_request->type == OBJ_REQUEST_PAGES) {
1909 obj_request->pages = NULL;
1910 obj_request->page_count = 0;
1913 if (img_request_child_test(img_request)) {
1914 rbd_assert(img_request->obj_request != NULL);
1915 more = obj_request->which < img_request->obj_request_count - 1;
1917 rbd_assert(img_request->rq != NULL);
1918 more = blk_end_request(img_request->rq, result, xferred);
1924 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1926 struct rbd_img_request *img_request;
1927 u32 which = obj_request->which;
1930 rbd_assert(obj_request_img_data_test(obj_request));
1931 img_request = obj_request->img_request;
1933 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1934 rbd_assert(img_request != NULL);
1935 rbd_assert(img_request->obj_request_count > 0);
1936 rbd_assert(which != BAD_WHICH);
1937 rbd_assert(which < img_request->obj_request_count);
1938 rbd_assert(which >= img_request->next_completion);
1940 spin_lock_irq(&img_request->completion_lock);
1941 if (which != img_request->next_completion)
1944 for_each_obj_request_from(img_request, obj_request) {
1946 rbd_assert(which < img_request->obj_request_count);
1948 if (!obj_request_done_test(obj_request))
1950 more = rbd_img_obj_end_request(obj_request);
1954 rbd_assert(more ^ (which == img_request->obj_request_count));
1955 img_request->next_completion = which;
1957 spin_unlock_irq(&img_request->completion_lock);
1960 rbd_img_request_complete(img_request);
1964 * Split up an image request into one or more object requests, each
1965 * to a different object. The "type" parameter indicates whether
1966 * "data_desc" is the pointer to the head of a list of bio
1967 * structures, or the base of a page array. In either case this
1968 * function assumes data_desc describes memory sufficient to hold
1969 * all data described by the image request.
1971 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1972 enum obj_request_type type,
1975 struct rbd_device *rbd_dev = img_request->rbd_dev;
1976 struct rbd_obj_request *obj_request = NULL;
1977 struct rbd_obj_request *next_obj_request;
1978 bool write_request = img_request_write_test(img_request);
1979 struct bio *bio_list;
1980 unsigned int bio_offset = 0;
1981 struct page **pages;
1986 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1987 (int)type, data_desc);
1989 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1990 img_offset = img_request->offset;
1991 resid = img_request->length;
1992 rbd_assert(resid > 0);
1994 if (type == OBJ_REQUEST_BIO) {
1995 bio_list = data_desc;
1996 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1998 rbd_assert(type == OBJ_REQUEST_PAGES);
2003 struct ceph_osd_request *osd_req;
2004 const char *object_name;
2008 object_name = rbd_segment_name(rbd_dev, img_offset);
2011 offset = rbd_segment_offset(rbd_dev, img_offset);
2012 length = rbd_segment_length(rbd_dev, img_offset, resid);
2013 obj_request = rbd_obj_request_create(object_name,
2014 offset, length, type);
2015 kfree(object_name); /* object request has its own copy */
2019 if (type == OBJ_REQUEST_BIO) {
2020 unsigned int clone_size;
2022 rbd_assert(length <= (u64)UINT_MAX);
2023 clone_size = (unsigned int)length;
2024 obj_request->bio_list =
2025 bio_chain_clone_range(&bio_list,
2029 if (!obj_request->bio_list)
2032 unsigned int page_count;
2034 obj_request->pages = pages;
2035 page_count = (u32)calc_pages_for(offset, length);
2036 obj_request->page_count = page_count;
2037 if ((offset + length) & ~PAGE_MASK)
2038 page_count--; /* more on last page */
2039 pages += page_count;
2042 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2046 obj_request->osd_req = osd_req;
2047 obj_request->callback = rbd_img_obj_callback;
2049 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2051 if (type == OBJ_REQUEST_BIO)
2052 osd_req_op_extent_osd_data_bio(osd_req, 0,
2053 obj_request->bio_list, length);
2055 osd_req_op_extent_osd_data_pages(osd_req, 0,
2056 obj_request->pages, length,
2057 offset & ~PAGE_MASK, false, false);
2060 rbd_osd_req_format_write(obj_request);
2062 rbd_osd_req_format_read(obj_request);
2064 obj_request->img_offset = img_offset;
2065 rbd_img_obj_request_add(img_request, obj_request);
2067 img_offset += length;
2074 rbd_obj_request_put(obj_request);
2076 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2077 rbd_obj_request_put(obj_request);
2083 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2085 struct rbd_img_request *img_request;
2086 struct rbd_device *rbd_dev;
2090 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2091 rbd_assert(obj_request_img_data_test(obj_request));
2092 img_request = obj_request->img_request;
2093 rbd_assert(img_request);
2095 rbd_dev = img_request->rbd_dev;
2096 rbd_assert(rbd_dev);
2097 length = (u64)1 << rbd_dev->header.obj_order;
2098 page_count = (u32)calc_pages_for(0, length);
2100 rbd_assert(obj_request->copyup_pages);
2101 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2102 obj_request->copyup_pages = NULL;
2105 * We want the transfer count to reflect the size of the
2106 * original write request. There is no such thing as a
2107 * successful short write, so if the request was successful
2108 * we can just set it to the originally-requested length.
2110 if (!obj_request->result)
2111 obj_request->xferred = obj_request->length;
2113 /* Finish up with the normal image object callback */
2115 rbd_img_obj_callback(obj_request);
2119 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2121 struct rbd_obj_request *orig_request;
2122 struct ceph_osd_request *osd_req;
2123 struct ceph_osd_client *osdc;
2124 struct rbd_device *rbd_dev;
2125 struct page **pages;
2130 rbd_assert(img_request_child_test(img_request));
2132 /* First get what we need from the image request */
2134 pages = img_request->copyup_pages;
2135 rbd_assert(pages != NULL);
2136 img_request->copyup_pages = NULL;
2138 orig_request = img_request->obj_request;
2139 rbd_assert(orig_request != NULL);
2140 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2141 result = img_request->result;
2142 obj_size = img_request->length;
2143 xferred = img_request->xferred;
2145 rbd_dev = img_request->rbd_dev;
2146 rbd_assert(rbd_dev);
2147 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2149 rbd_img_request_put(img_request);
2154 /* Allocate the new copyup osd request for the original request */
2157 rbd_assert(!orig_request->osd_req);
2158 osd_req = rbd_osd_req_create_copyup(orig_request);
2161 orig_request->osd_req = osd_req;
2162 orig_request->copyup_pages = pages;
2164 /* Initialize the copyup op */
2166 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2167 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2170 /* Then the original write request op */
2172 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2173 orig_request->offset,
2174 orig_request->length, 0, 0);
2175 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2176 orig_request->length);
2178 rbd_osd_req_format_write(orig_request);
2180 /* All set, send it off. */
2182 orig_request->callback = rbd_img_obj_copyup_callback;
2183 osdc = &rbd_dev->rbd_client->client->osdc;
2184 result = rbd_obj_request_submit(osdc, orig_request);
2188 /* Record the error code and complete the request */
2190 orig_request->result = result;
2191 orig_request->xferred = 0;
2192 obj_request_done_set(orig_request);
2193 rbd_obj_request_complete(orig_request);
2197 * Read from the parent image the range of data that covers the
2198 * entire target of the given object request. This is used for
2199 * satisfying a layered image write request when the target of an
2200 * object request from the image request does not exist.
2202 * A page array big enough to hold the returned data is allocated
2203 * and supplied to rbd_img_request_fill() as the "data descriptor."
2204 * When the read completes, this page array will be transferred to
2205 * the original object request for the copyup operation.
2207 * If an error occurs, record it as the result of the original
2208 * object request and mark it done so it gets completed.
2210 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2212 struct rbd_img_request *img_request = NULL;
2213 struct rbd_img_request *parent_request = NULL;
2214 struct rbd_device *rbd_dev;
2217 struct page **pages = NULL;
2221 rbd_assert(obj_request_img_data_test(obj_request));
2222 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2224 img_request = obj_request->img_request;
2225 rbd_assert(img_request != NULL);
2226 rbd_dev = img_request->rbd_dev;
2227 rbd_assert(rbd_dev->parent != NULL);
2230 * First things first. The original osd request is of no
2231 * use to use any more, we'll need a new one that can hold
2232 * the two ops in a copyup request. We'll get that later,
2233 * but for now we can release the old one.
2235 rbd_osd_req_destroy(obj_request->osd_req);
2236 obj_request->osd_req = NULL;
2239 * Determine the byte range covered by the object in the
2240 * child image to which the original request was to be sent.
2242 img_offset = obj_request->img_offset - obj_request->offset;
2243 length = (u64)1 << rbd_dev->header.obj_order;
2246 * There is no defined parent data beyond the parent
2247 * overlap, so limit what we read at that boundary if
2250 if (img_offset + length > rbd_dev->parent_overlap) {
2251 rbd_assert(img_offset < rbd_dev->parent_overlap);
2252 length = rbd_dev->parent_overlap - img_offset;
2256 * Allocate a page array big enough to receive the data read
2259 page_count = (u32)calc_pages_for(0, length);
2260 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2261 if (IS_ERR(pages)) {
2262 result = PTR_ERR(pages);
2268 parent_request = rbd_img_request_create(rbd_dev->parent,
2271 if (!parent_request)
2273 rbd_obj_request_get(obj_request);
2274 parent_request->obj_request = obj_request;
2276 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2279 parent_request->copyup_pages = pages;
2281 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2282 result = rbd_img_request_submit(parent_request);
2286 parent_request->copyup_pages = NULL;
2287 parent_request->obj_request = NULL;
2288 rbd_obj_request_put(obj_request);
2291 ceph_release_page_vector(pages, page_count);
2293 rbd_img_request_put(parent_request);
2294 obj_request->result = result;
2295 obj_request->xferred = 0;
2296 obj_request_done_set(obj_request);
2301 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2303 struct rbd_obj_request *orig_request;
2306 rbd_assert(!obj_request_img_data_test(obj_request));
2309 * All we need from the object request is the original
2310 * request and the result of the STAT op. Grab those, then
2311 * we're done with the request.
2313 orig_request = obj_request->obj_request;
2314 obj_request->obj_request = NULL;
2315 rbd_assert(orig_request);
2316 rbd_assert(orig_request->img_request);
2318 result = obj_request->result;
2319 obj_request->result = 0;
2321 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2322 obj_request, orig_request, result,
2323 obj_request->xferred, obj_request->length);
2324 rbd_obj_request_put(obj_request);
2326 rbd_assert(orig_request);
2327 rbd_assert(orig_request->img_request);
2330 * Our only purpose here is to determine whether the object
2331 * exists, and we don't want to treat the non-existence as
2332 * an error. If something else comes back, transfer the
2333 * error to the original request and complete it now.
2336 obj_request_existence_set(orig_request, true);
2337 } else if (result == -ENOENT) {
2338 obj_request_existence_set(orig_request, false);
2339 } else if (result) {
2340 orig_request->result = result;
2345 * Resubmit the original request now that we have recorded
2346 * whether the target object exists.
2348 orig_request->result = rbd_img_obj_request_submit(orig_request);
2350 if (orig_request->result)
2351 rbd_obj_request_complete(orig_request);
2352 rbd_obj_request_put(orig_request);
2355 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2357 struct rbd_obj_request *stat_request;
2358 struct rbd_device *rbd_dev;
2359 struct ceph_osd_client *osdc;
2360 struct page **pages = NULL;
2366 * The response data for a STAT call consists of:
2373 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2374 page_count = (u32)calc_pages_for(0, size);
2375 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2377 return PTR_ERR(pages);
2380 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2385 rbd_obj_request_get(obj_request);
2386 stat_request->obj_request = obj_request;
2387 stat_request->pages = pages;
2388 stat_request->page_count = page_count;
2390 rbd_assert(obj_request->img_request);
2391 rbd_dev = obj_request->img_request->rbd_dev;
2392 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2394 if (!stat_request->osd_req)
2396 stat_request->callback = rbd_img_obj_exists_callback;
2398 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2399 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2401 rbd_osd_req_format_read(stat_request);
2403 osdc = &rbd_dev->rbd_client->client->osdc;
2404 ret = rbd_obj_request_submit(osdc, stat_request);
2407 rbd_obj_request_put(obj_request);
2412 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2414 struct rbd_img_request *img_request;
2415 struct rbd_device *rbd_dev;
2418 rbd_assert(obj_request_img_data_test(obj_request));
2420 img_request = obj_request->img_request;
2421 rbd_assert(img_request);
2422 rbd_dev = img_request->rbd_dev;
2425 * Only writes to layered images need special handling.
2426 * Reads and non-layered writes are simple object requests.
2427 * Layered writes that start beyond the end of the overlap
2428 * with the parent have no parent data, so they too are
2429 * simple object requests. Finally, if the target object is
2430 * known to already exist, its parent data has already been
2431 * copied, so a write to the object can also be handled as a
2432 * simple object request.
2434 if (!img_request_write_test(img_request) ||
2435 !img_request_layered_test(img_request) ||
2436 rbd_dev->parent_overlap <= obj_request->img_offset ||
2437 ((known = obj_request_known_test(obj_request)) &&
2438 obj_request_exists_test(obj_request))) {
2440 struct rbd_device *rbd_dev;
2441 struct ceph_osd_client *osdc;
2443 rbd_dev = obj_request->img_request->rbd_dev;
2444 osdc = &rbd_dev->rbd_client->client->osdc;
2446 return rbd_obj_request_submit(osdc, obj_request);
2450 * It's a layered write. The target object might exist but
2451 * we may not know that yet. If we know it doesn't exist,
2452 * start by reading the data for the full target object from
2453 * the parent so we can use it for a copyup to the target.
2456 return rbd_img_obj_parent_read_full(obj_request);
2458 /* We don't know whether the target exists. Go find out. */
2460 return rbd_img_obj_exists_submit(obj_request);
2463 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2465 struct rbd_obj_request *obj_request;
2466 struct rbd_obj_request *next_obj_request;
2468 dout("%s: img %p\n", __func__, img_request);
2469 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2472 ret = rbd_img_obj_request_submit(obj_request);
2480 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2482 struct rbd_obj_request *obj_request;
2483 struct rbd_device *rbd_dev;
2486 rbd_assert(img_request_child_test(img_request));
2488 obj_request = img_request->obj_request;
2489 rbd_assert(obj_request);
2490 rbd_assert(obj_request->img_request);
2492 obj_request->result = img_request->result;
2493 if (obj_request->result)
2497 * We need to zero anything beyond the parent overlap
2498 * boundary. Since rbd_img_obj_request_read_callback()
2499 * will zero anything beyond the end of a short read, an
2500 * easy way to do this is to pretend the data from the
2501 * parent came up short--ending at the overlap boundary.
2503 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2504 obj_end = obj_request->img_offset + obj_request->length;
2505 rbd_dev = obj_request->img_request->rbd_dev;
2506 if (obj_end > rbd_dev->parent_overlap) {
2509 if (obj_request->img_offset < rbd_dev->parent_overlap)
2510 xferred = rbd_dev->parent_overlap -
2511 obj_request->img_offset;
2513 obj_request->xferred = min(img_request->xferred, xferred);
2515 obj_request->xferred = img_request->xferred;
2518 rbd_img_obj_request_read_callback(obj_request);
2519 rbd_obj_request_complete(obj_request);
2522 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2524 struct rbd_device *rbd_dev;
2525 struct rbd_img_request *img_request;
2528 rbd_assert(obj_request_img_data_test(obj_request));
2529 rbd_assert(obj_request->img_request != NULL);
2530 rbd_assert(obj_request->result == (s32) -ENOENT);
2531 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2533 rbd_dev = obj_request->img_request->rbd_dev;
2534 rbd_assert(rbd_dev->parent != NULL);
2535 /* rbd_read_finish(obj_request, obj_request->length); */
2536 img_request = rbd_img_request_create(rbd_dev->parent,
2537 obj_request->img_offset,
2538 obj_request->length,
2544 rbd_obj_request_get(obj_request);
2545 img_request->obj_request = obj_request;
2547 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2548 obj_request->bio_list);
2552 img_request->callback = rbd_img_parent_read_callback;
2553 result = rbd_img_request_submit(img_request);
2560 rbd_img_request_put(img_request);
2561 obj_request->result = result;
2562 obj_request->xferred = 0;
2563 obj_request_done_set(obj_request);
2566 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2568 struct rbd_obj_request *obj_request;
2569 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2572 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2573 OBJ_REQUEST_NODATA);
2578 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2579 if (!obj_request->osd_req)
2581 obj_request->callback = rbd_obj_request_put;
2583 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2585 rbd_osd_req_format_read(obj_request);
2587 ret = rbd_obj_request_submit(osdc, obj_request);
2590 rbd_obj_request_put(obj_request);
2595 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2597 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2602 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2603 rbd_dev->header_name, (unsigned long long)notify_id,
2604 (unsigned int)opcode);
2605 (void)rbd_dev_refresh(rbd_dev);
2607 rbd_obj_notify_ack(rbd_dev, notify_id);
2611 * Request sync osd watch/unwatch. The value of "start" determines
2612 * whether a watch request is being initiated or torn down.
2614 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2616 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2617 struct rbd_obj_request *obj_request;
2620 rbd_assert(start ^ !!rbd_dev->watch_event);
2621 rbd_assert(start ^ !!rbd_dev->watch_request);
2624 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2625 &rbd_dev->watch_event);
2628 rbd_assert(rbd_dev->watch_event != NULL);
2632 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2633 OBJ_REQUEST_NODATA);
2637 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2638 if (!obj_request->osd_req)
2642 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2644 ceph_osdc_unregister_linger_request(osdc,
2645 rbd_dev->watch_request->osd_req);
2647 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2648 rbd_dev->watch_event->cookie, 0, start);
2649 rbd_osd_req_format_write(obj_request);
2651 ret = rbd_obj_request_submit(osdc, obj_request);
2654 ret = rbd_obj_request_wait(obj_request);
2657 ret = obj_request->result;
2662 * A watch request is set to linger, so the underlying osd
2663 * request won't go away until we unregister it. We retain
2664 * a pointer to the object request during that time (in
2665 * rbd_dev->watch_request), so we'll keep a reference to
2666 * it. We'll drop that reference (below) after we've
2670 rbd_dev->watch_request = obj_request;
2675 /* We have successfully torn down the watch request */
2677 rbd_obj_request_put(rbd_dev->watch_request);
2678 rbd_dev->watch_request = NULL;
2680 /* Cancel the event if we're tearing down, or on error */
2681 ceph_osdc_cancel_event(rbd_dev->watch_event);
2682 rbd_dev->watch_event = NULL;
2684 rbd_obj_request_put(obj_request);
2690 * Synchronous osd object method call. Returns the number of bytes
2691 * returned in the outbound buffer, or a negative error code.
2693 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2694 const char *object_name,
2695 const char *class_name,
2696 const char *method_name,
2697 const void *outbound,
2698 size_t outbound_size,
2700 size_t inbound_size)
2702 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2703 struct rbd_obj_request *obj_request;
2704 struct page **pages;
2709 * Method calls are ultimately read operations. The result
2710 * should placed into the inbound buffer provided. They
2711 * also supply outbound data--parameters for the object
2712 * method. Currently if this is present it will be a
2715 page_count = (u32)calc_pages_for(0, inbound_size);
2716 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2718 return PTR_ERR(pages);
2721 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2726 obj_request->pages = pages;
2727 obj_request->page_count = page_count;
2729 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2730 if (!obj_request->osd_req)
2733 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2734 class_name, method_name);
2735 if (outbound_size) {
2736 struct ceph_pagelist *pagelist;
2738 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2742 ceph_pagelist_init(pagelist);
2743 ceph_pagelist_append(pagelist, outbound, outbound_size);
2744 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2747 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2748 obj_request->pages, inbound_size,
2750 rbd_osd_req_format_read(obj_request);
2752 ret = rbd_obj_request_submit(osdc, obj_request);
2755 ret = rbd_obj_request_wait(obj_request);
2759 ret = obj_request->result;
2763 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2764 ret = (int)obj_request->xferred;
2765 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2768 rbd_obj_request_put(obj_request);
2770 ceph_release_page_vector(pages, page_count);
2775 static void rbd_request_fn(struct request_queue *q)
2776 __releases(q->queue_lock) __acquires(q->queue_lock)
2778 struct rbd_device *rbd_dev = q->queuedata;
2779 bool read_only = rbd_dev->mapping.read_only;
2783 while ((rq = blk_fetch_request(q))) {
2784 bool write_request = rq_data_dir(rq) == WRITE;
2785 struct rbd_img_request *img_request;
2789 /* Ignore any non-FS requests that filter through. */
2791 if (rq->cmd_type != REQ_TYPE_FS) {
2792 dout("%s: non-fs request type %d\n", __func__,
2793 (int) rq->cmd_type);
2794 __blk_end_request_all(rq, 0);
2798 /* Ignore/skip any zero-length requests */
2800 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2801 length = (u64) blk_rq_bytes(rq);
2804 dout("%s: zero-length request\n", __func__);
2805 __blk_end_request_all(rq, 0);
2809 spin_unlock_irq(q->queue_lock);
2811 /* Disallow writes to a read-only device */
2813 if (write_request) {
2817 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2821 * Quit early if the mapped snapshot no longer
2822 * exists. It's still possible the snapshot will
2823 * have disappeared by the time our request arrives
2824 * at the osd, but there's no sense in sending it if
2827 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2828 dout("request for non-existent snapshot");
2829 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2835 if (offset && length > U64_MAX - offset + 1) {
2836 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2838 goto end_request; /* Shouldn't happen */
2842 img_request = rbd_img_request_create(rbd_dev, offset, length,
2843 write_request, false);
2847 img_request->rq = rq;
2849 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2852 result = rbd_img_request_submit(img_request);
2854 rbd_img_request_put(img_request);
2856 spin_lock_irq(q->queue_lock);
2858 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2859 write_request ? "write" : "read",
2860 length, offset, result);
2862 __blk_end_request_all(rq, result);
2868 * a queue callback. Makes sure that we don't create a bio that spans across
2869 * multiple osd objects. One exception would be with a single page bios,
2870 * which we handle later at bio_chain_clone_range()
2872 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2873 struct bio_vec *bvec)
2875 struct rbd_device *rbd_dev = q->queuedata;
2876 sector_t sector_offset;
2877 sector_t sectors_per_obj;
2878 sector_t obj_sector_offset;
2882 * Find how far into its rbd object the partition-relative
2883 * bio start sector is to offset relative to the enclosing
2886 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2887 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2888 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2891 * Compute the number of bytes from that offset to the end
2892 * of the object. Account for what's already used by the bio.
2894 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2895 if (ret > bmd->bi_size)
2896 ret -= bmd->bi_size;
2901 * Don't send back more than was asked for. And if the bio
2902 * was empty, let the whole thing through because: "Note
2903 * that a block device *must* allow a single page to be
2904 * added to an empty bio."
2906 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2907 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2908 ret = (int) bvec->bv_len;
2913 static void rbd_free_disk(struct rbd_device *rbd_dev)
2915 struct gendisk *disk = rbd_dev->disk;
2920 rbd_dev->disk = NULL;
2921 if (disk->flags & GENHD_FL_UP) {
2924 blk_cleanup_queue(disk->queue);
2929 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2930 const char *object_name,
2931 u64 offset, u64 length, void *buf)
2934 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2935 struct rbd_obj_request *obj_request;
2936 struct page **pages = NULL;
2941 page_count = (u32) calc_pages_for(offset, length);
2942 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2944 ret = PTR_ERR(pages);
2947 obj_request = rbd_obj_request_create(object_name, offset, length,
2952 obj_request->pages = pages;
2953 obj_request->page_count = page_count;
2955 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2956 if (!obj_request->osd_req)
2959 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2960 offset, length, 0, 0);
2961 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2963 obj_request->length,
2964 obj_request->offset & ~PAGE_MASK,
2966 rbd_osd_req_format_read(obj_request);
2968 ret = rbd_obj_request_submit(osdc, obj_request);
2971 ret = rbd_obj_request_wait(obj_request);
2975 ret = obj_request->result;
2979 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2980 size = (size_t) obj_request->xferred;
2981 ceph_copy_from_page_vector(pages, buf, 0, size);
2982 rbd_assert(size <= (size_t)INT_MAX);
2986 rbd_obj_request_put(obj_request);
2988 ceph_release_page_vector(pages, page_count);
2994 * Read the complete header for the given rbd device.
2996 * Returns a pointer to a dynamically-allocated buffer containing
2997 * the complete and validated header. Caller can pass the address
2998 * of a variable that will be filled in with the version of the
2999 * header object at the time it was read.
3001 * Returns a pointer-coded errno if a failure occurs.
3003 static struct rbd_image_header_ondisk *
3004 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3006 struct rbd_image_header_ondisk *ondisk = NULL;
3013 * The complete header will include an array of its 64-bit
3014 * snapshot ids, followed by the names of those snapshots as
3015 * a contiguous block of NUL-terminated strings. Note that
3016 * the number of snapshots could change by the time we read
3017 * it in, in which case we re-read it.
3024 size = sizeof (*ondisk);
3025 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3027 ondisk = kmalloc(size, GFP_KERNEL);
3029 return ERR_PTR(-ENOMEM);
3031 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3035 if ((size_t)ret < size) {
3037 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3041 if (!rbd_dev_ondisk_valid(ondisk)) {
3043 rbd_warn(rbd_dev, "invalid header");
3047 names_size = le64_to_cpu(ondisk->snap_names_len);
3048 want_count = snap_count;
3049 snap_count = le32_to_cpu(ondisk->snap_count);
3050 } while (snap_count != want_count);
3057 return ERR_PTR(ret);
3061 * reload the ondisk the header
3063 static int rbd_read_header(struct rbd_device *rbd_dev,
3064 struct rbd_image_header *header)
3066 struct rbd_image_header_ondisk *ondisk;
3069 ondisk = rbd_dev_v1_header_read(rbd_dev);
3071 return PTR_ERR(ondisk);
3072 ret = rbd_header_from_disk(header, ondisk);
3078 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3080 struct rbd_snap *snap;
3081 struct rbd_snap *next;
3083 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3084 list_del(&snap->node);
3085 rbd_snap_destroy(snap);
3089 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3091 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3094 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3097 rbd_dev->mapping.size = rbd_dev->header.image_size;
3098 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3099 dout("setting size to %llu sectors", (unsigned long long)size);
3100 set_capacity(rbd_dev->disk, size);
3105 * only read the first part of the ondisk header, without the snaps info
3107 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3110 struct rbd_image_header h;
3112 ret = rbd_read_header(rbd_dev, &h);
3116 down_write(&rbd_dev->header_rwsem);
3118 /* Update image size, and check for resize of mapped image */
3119 rbd_dev->header.image_size = h.image_size;
3120 rbd_update_mapping_size(rbd_dev);
3122 /* rbd_dev->header.object_prefix shouldn't change */
3123 kfree(rbd_dev->header.snap_sizes);
3124 kfree(rbd_dev->header.snap_names);
3125 /* osd requests may still refer to snapc */
3126 ceph_put_snap_context(rbd_dev->header.snapc);
3128 rbd_dev->header.image_size = h.image_size;
3129 rbd_dev->header.snapc = h.snapc;
3130 rbd_dev->header.snap_names = h.snap_names;
3131 rbd_dev->header.snap_sizes = h.snap_sizes;
3132 /* Free the extra copy of the object prefix */
3133 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3134 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3135 kfree(h.object_prefix);
3137 ret = rbd_dev_snaps_update(rbd_dev);
3139 up_write(&rbd_dev->header_rwsem);
3144 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3149 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3150 image_size = rbd_dev->header.image_size;
3151 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3152 if (rbd_dev->image_format == 1)
3153 ret = rbd_dev_v1_refresh(rbd_dev);
3155 ret = rbd_dev_v2_refresh(rbd_dev);
3156 mutex_unlock(&ctl_mutex);
3158 rbd_warn(rbd_dev, "got notification but failed to "
3159 " update snaps: %d\n", ret);
3160 if (image_size != rbd_dev->header.image_size)
3161 revalidate_disk(rbd_dev->disk);
3166 static int rbd_init_disk(struct rbd_device *rbd_dev)
3168 struct gendisk *disk;
3169 struct request_queue *q;
3172 /* create gendisk info */
3173 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3177 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3179 disk->major = rbd_dev->major;
3180 disk->first_minor = 0;
3181 disk->fops = &rbd_bd_ops;
3182 disk->private_data = rbd_dev;
3184 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3188 /* We use the default size, but let's be explicit about it. */
3189 blk_queue_physical_block_size(q, SECTOR_SIZE);
3191 /* set io sizes to object size */
3192 segment_size = rbd_obj_bytes(&rbd_dev->header);
3193 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3194 blk_queue_max_segment_size(q, segment_size);
3195 blk_queue_io_min(q, segment_size);
3196 blk_queue_io_opt(q, segment_size);
3198 blk_queue_merge_bvec(q, rbd_merge_bvec);
3201 q->queuedata = rbd_dev;
3203 rbd_dev->disk = disk;
3216 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3218 return container_of(dev, struct rbd_device, dev);
3221 static ssize_t rbd_size_show(struct device *dev,
3222 struct device_attribute *attr, char *buf)
3224 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3226 return sprintf(buf, "%llu\n",
3227 (unsigned long long)rbd_dev->mapping.size);
3231 * Note this shows the features for whatever's mapped, which is not
3232 * necessarily the base image.
3234 static ssize_t rbd_features_show(struct device *dev,
3235 struct device_attribute *attr, char *buf)
3237 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3239 return sprintf(buf, "0x%016llx\n",
3240 (unsigned long long)rbd_dev->mapping.features);
3243 static ssize_t rbd_major_show(struct device *dev,
3244 struct device_attribute *attr, char *buf)
3246 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3249 return sprintf(buf, "%d\n", rbd_dev->major);
3251 return sprintf(buf, "(none)\n");
3255 static ssize_t rbd_client_id_show(struct device *dev,
3256 struct device_attribute *attr, char *buf)
3258 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3260 return sprintf(buf, "client%lld\n",
3261 ceph_client_id(rbd_dev->rbd_client->client));
3264 static ssize_t rbd_pool_show(struct device *dev,
3265 struct device_attribute *attr, char *buf)
3267 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3269 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3272 static ssize_t rbd_pool_id_show(struct device *dev,
3273 struct device_attribute *attr, char *buf)
3275 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3277 return sprintf(buf, "%llu\n",
3278 (unsigned long long) rbd_dev->spec->pool_id);
3281 static ssize_t rbd_name_show(struct device *dev,
3282 struct device_attribute *attr, char *buf)
3284 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3286 if (rbd_dev->spec->image_name)
3287 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3289 return sprintf(buf, "(unknown)\n");
3292 static ssize_t rbd_image_id_show(struct device *dev,
3293 struct device_attribute *attr, char *buf)
3295 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3297 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3301 * Shows the name of the currently-mapped snapshot (or
3302 * RBD_SNAP_HEAD_NAME for the base image).
3304 static ssize_t rbd_snap_show(struct device *dev,
3305 struct device_attribute *attr,
3308 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3310 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3314 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3315 * for the parent image. If there is no parent, simply shows
3316 * "(no parent image)".
3318 static ssize_t rbd_parent_show(struct device *dev,
3319 struct device_attribute *attr,
3322 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3323 struct rbd_spec *spec = rbd_dev->parent_spec;
3328 return sprintf(buf, "(no parent image)\n");
3330 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3331 (unsigned long long) spec->pool_id, spec->pool_name);
3336 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3337 spec->image_name ? spec->image_name : "(unknown)");
3342 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3343 (unsigned long long) spec->snap_id, spec->snap_name);
3348 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3353 return (ssize_t) (bufp - buf);
3356 static ssize_t rbd_image_refresh(struct device *dev,
3357 struct device_attribute *attr,
3361 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3364 ret = rbd_dev_refresh(rbd_dev);
3366 return ret < 0 ? ret : size;
3369 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3370 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3371 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3372 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3373 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3374 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3375 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3376 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3377 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3378 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3379 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3381 static struct attribute *rbd_attrs[] = {
3382 &dev_attr_size.attr,
3383 &dev_attr_features.attr,
3384 &dev_attr_major.attr,
3385 &dev_attr_client_id.attr,
3386 &dev_attr_pool.attr,
3387 &dev_attr_pool_id.attr,
3388 &dev_attr_name.attr,
3389 &dev_attr_image_id.attr,
3390 &dev_attr_current_snap.attr,
3391 &dev_attr_parent.attr,
3392 &dev_attr_refresh.attr,
3396 static struct attribute_group rbd_attr_group = {
3400 static const struct attribute_group *rbd_attr_groups[] = {
3405 static void rbd_sysfs_dev_release(struct device *dev)
3409 static struct device_type rbd_device_type = {
3411 .groups = rbd_attr_groups,
3412 .release = rbd_sysfs_dev_release,
3415 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3417 kref_get(&spec->kref);
3422 static void rbd_spec_free(struct kref *kref);
3423 static void rbd_spec_put(struct rbd_spec *spec)
3426 kref_put(&spec->kref, rbd_spec_free);
3429 static struct rbd_spec *rbd_spec_alloc(void)
3431 struct rbd_spec *spec;
3433 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3436 kref_init(&spec->kref);
3441 static void rbd_spec_free(struct kref *kref)
3443 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3445 kfree(spec->pool_name);
3446 kfree(spec->image_id);
3447 kfree(spec->image_name);
3448 kfree(spec->snap_name);
3452 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3453 struct rbd_spec *spec)
3455 struct rbd_device *rbd_dev;
3457 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3461 spin_lock_init(&rbd_dev->lock);
3463 INIT_LIST_HEAD(&rbd_dev->node);
3464 INIT_LIST_HEAD(&rbd_dev->snaps);
3465 init_rwsem(&rbd_dev->header_rwsem);
3467 rbd_dev->spec = spec;
3468 rbd_dev->rbd_client = rbdc;
3470 /* Initialize the layout used for all rbd requests */
3472 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3473 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3474 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3475 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3480 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3482 rbd_put_client(rbd_dev->rbd_client);
3483 rbd_spec_put(rbd_dev->spec);
3487 static void rbd_snap_destroy(struct rbd_snap *snap)
3493 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3494 const char *snap_name,
3495 u64 snap_id, u64 snap_size,
3498 struct rbd_snap *snap;
3500 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3502 return ERR_PTR(-ENOMEM);
3504 snap->name = snap_name;
3506 snap->size = snap_size;
3507 snap->features = snap_features;
3513 * Returns a dynamically-allocated snapshot name if successful, or a
3514 * pointer-coded error otherwise.
3516 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev,
3517 u64 snap_id, u64 *snap_size, u64 *snap_features)
3519 const char *snap_name;
3522 which = rbd_dev_snap_index(rbd_dev, snap_id);
3523 if (which == BAD_SNAP_INDEX)
3524 return ERR_PTR(-ENOENT);
3525 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
3527 return ERR_PTR(-ENOMEM);
3529 *snap_size = rbd_dev->header.snap_sizes[which];
3530 *snap_features = 0; /* No features for v1 */
3536 * Get the size and object order for an image snapshot, or if
3537 * snap_id is CEPH_NOSNAP, gets this information for the base
3540 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3541 u8 *order, u64 *snap_size)
3543 __le64 snapid = cpu_to_le64(snap_id);
3548 } __attribute__ ((packed)) size_buf = { 0 };
3550 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3552 &snapid, sizeof (snapid),
3553 &size_buf, sizeof (size_buf));
3554 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3557 if (ret < sizeof (size_buf))
3561 *order = size_buf.order;
3562 *snap_size = le64_to_cpu(size_buf.size);
3564 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3565 (unsigned long long)snap_id, (unsigned int)*order,
3566 (unsigned long long)*snap_size);
3571 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3573 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3574 &rbd_dev->header.obj_order,
3575 &rbd_dev->header.image_size);
3578 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3584 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3588 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3589 "rbd", "get_object_prefix", NULL, 0,
3590 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3591 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3596 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3597 p + ret, NULL, GFP_NOIO);
3600 if (IS_ERR(rbd_dev->header.object_prefix)) {
3601 ret = PTR_ERR(rbd_dev->header.object_prefix);
3602 rbd_dev->header.object_prefix = NULL;
3604 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3612 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3615 __le64 snapid = cpu_to_le64(snap_id);
3619 } __attribute__ ((packed)) features_buf = { 0 };
3623 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3624 "rbd", "get_features",
3625 &snapid, sizeof (snapid),
3626 &features_buf, sizeof (features_buf));
3627 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3630 if (ret < sizeof (features_buf))
3633 incompat = le64_to_cpu(features_buf.incompat);
3634 if (incompat & ~RBD_FEATURES_SUPPORTED)
3637 *snap_features = le64_to_cpu(features_buf.features);
3639 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3640 (unsigned long long)snap_id,
3641 (unsigned long long)*snap_features,
3642 (unsigned long long)le64_to_cpu(features_buf.incompat));
3647 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3649 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3650 &rbd_dev->header.features);
3653 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3655 struct rbd_spec *parent_spec;
3657 void *reply_buf = NULL;
3665 parent_spec = rbd_spec_alloc();
3669 size = sizeof (__le64) + /* pool_id */
3670 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3671 sizeof (__le64) + /* snap_id */
3672 sizeof (__le64); /* overlap */
3673 reply_buf = kmalloc(size, GFP_KERNEL);
3679 snapid = cpu_to_le64(CEPH_NOSNAP);
3680 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3681 "rbd", "get_parent",
3682 &snapid, sizeof (snapid),
3684 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3689 end = reply_buf + ret;
3691 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3692 if (parent_spec->pool_id == CEPH_NOPOOL)
3693 goto out; /* No parent? No problem. */
3695 /* The ceph file layout needs to fit pool id in 32 bits */
3698 if (parent_spec->pool_id > (u64)U32_MAX) {
3699 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3700 (unsigned long long)parent_spec->pool_id, U32_MAX);
3704 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3705 if (IS_ERR(image_id)) {
3706 ret = PTR_ERR(image_id);
3709 parent_spec->image_id = image_id;
3710 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3711 ceph_decode_64_safe(&p, end, overlap, out_err);
3713 rbd_dev->parent_overlap = overlap;
3714 rbd_dev->parent_spec = parent_spec;
3715 parent_spec = NULL; /* rbd_dev now owns this */
3720 rbd_spec_put(parent_spec);
3725 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3729 __le64 stripe_count;
3730 } __attribute__ ((packed)) striping_info_buf = { 0 };
3731 size_t size = sizeof (striping_info_buf);
3738 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3739 "rbd", "get_stripe_unit_count", NULL, 0,
3740 (char *)&striping_info_buf, size);
3741 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3748 * We don't actually support the "fancy striping" feature
3749 * (STRIPINGV2) yet, but if the striping sizes are the
3750 * defaults the behavior is the same as before. So find
3751 * out, and only fail if the image has non-default values.
3754 obj_size = (u64)1 << rbd_dev->header.obj_order;
3755 p = &striping_info_buf;
3756 stripe_unit = ceph_decode_64(&p);
3757 if (stripe_unit != obj_size) {
3758 rbd_warn(rbd_dev, "unsupported stripe unit "
3759 "(got %llu want %llu)",
3760 stripe_unit, obj_size);
3763 stripe_count = ceph_decode_64(&p);
3764 if (stripe_count != 1) {
3765 rbd_warn(rbd_dev, "unsupported stripe count "
3766 "(got %llu want 1)", stripe_count);
3769 rbd_dev->header.stripe_unit = stripe_unit;
3770 rbd_dev->header.stripe_count = stripe_count;
3775 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3777 size_t image_id_size;
3782 void *reply_buf = NULL;
3784 char *image_name = NULL;
3787 rbd_assert(!rbd_dev->spec->image_name);
3789 len = strlen(rbd_dev->spec->image_id);
3790 image_id_size = sizeof (__le32) + len;
3791 image_id = kmalloc(image_id_size, GFP_KERNEL);
3796 end = image_id + image_id_size;
3797 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3799 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3800 reply_buf = kmalloc(size, GFP_KERNEL);
3804 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3805 "rbd", "dir_get_name",
3806 image_id, image_id_size,
3811 end = reply_buf + ret;
3813 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3814 if (IS_ERR(image_name))
3817 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3825 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3827 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3828 const char *snap_name;
3831 /* Skip over names until we find the one we are looking for */
3833 snap_name = rbd_dev->header.snap_names;
3834 while (which < snapc->num_snaps) {
3835 if (!strcmp(name, snap_name))
3836 return snapc->snaps[which];
3837 snap_name += strlen(snap_name) + 1;
3843 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3845 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3850 for (which = 0; !found && which < snapc->num_snaps; which++) {
3851 const char *snap_name;
3853 snap_id = snapc->snaps[which];
3854 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3855 if (IS_ERR(snap_name))
3857 found = !strcmp(name, snap_name);
3860 return found ? snap_id : CEPH_NOSNAP;
3864 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3865 * no snapshot by that name is found, or if an error occurs.
3867 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3869 if (rbd_dev->image_format == 1)
3870 return rbd_v1_snap_id_by_name(rbd_dev, name);
3872 return rbd_v2_snap_id_by_name(rbd_dev, name);
3876 * When an rbd image has a parent image, it is identified by the
3877 * pool, image, and snapshot ids (not names). This function fills
3878 * in the names for those ids. (It's OK if we can't figure out the
3879 * name for an image id, but the pool and snapshot ids should always
3880 * exist and have names.) All names in an rbd spec are dynamically
3883 * When an image being mapped (not a parent) is probed, we have the
3884 * pool name and pool id, image name and image id, and the snapshot
3885 * name. The only thing we're missing is the snapshot id.
3887 * The set of snapshots for an image is not known until they have
3888 * been read by rbd_dev_snaps_update(), so we can't completely fill
3889 * in this information until after that has been called.
3891 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3893 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3894 struct rbd_spec *spec = rbd_dev->spec;
3895 const char *pool_name;
3896 const char *image_name;
3897 const char *snap_name;
3901 * An image being mapped will have the pool name (etc.), but
3902 * we need to look up the snapshot id.
3904 if (spec->pool_name) {
3905 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3908 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3909 if (snap_id == CEPH_NOSNAP)
3911 spec->snap_id = snap_id;
3913 spec->snap_id = CEPH_NOSNAP;
3919 /* Get the pool name; we have to make our own copy of this */
3921 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3923 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3926 pool_name = kstrdup(pool_name, GFP_KERNEL);
3930 /* Fetch the image name; tolerate failure here */
3932 image_name = rbd_dev_image_name(rbd_dev);
3934 rbd_warn(rbd_dev, "unable to get image name");
3936 /* Look up the snapshot name, and make a copy */
3938 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3944 spec->pool_name = pool_name;
3945 spec->image_name = image_name;
3946 spec->snap_name = snap_name;
3956 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3965 struct ceph_snap_context *snapc;
3969 * We'll need room for the seq value (maximum snapshot id),
3970 * snapshot count, and array of that many snapshot ids.
3971 * For now we have a fixed upper limit on the number we're
3972 * prepared to receive.
3974 size = sizeof (__le64) + sizeof (__le32) +
3975 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3976 reply_buf = kzalloc(size, GFP_KERNEL);
3980 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3981 "rbd", "get_snapcontext", NULL, 0,
3983 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3988 end = reply_buf + ret;
3990 ceph_decode_64_safe(&p, end, seq, out);
3991 ceph_decode_32_safe(&p, end, snap_count, out);
3994 * Make sure the reported number of snapshot ids wouldn't go
3995 * beyond the end of our buffer. But before checking that,
3996 * make sure the computed size of the snapshot context we
3997 * allocate is representable in a size_t.
3999 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4004 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4008 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4014 for (i = 0; i < snap_count; i++)
4015 snapc->snaps[i] = ceph_decode_64(&p);
4017 rbd_dev->header.snapc = snapc;
4019 dout(" snap context seq = %llu, snap_count = %u\n",
4020 (unsigned long long)seq, (unsigned int)snap_count);
4027 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4038 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4039 reply_buf = kmalloc(size, GFP_KERNEL);
4041 return ERR_PTR(-ENOMEM);
4043 snapid = cpu_to_le64(snap_id);
4044 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4045 "rbd", "get_snapshot_name",
4046 &snapid, sizeof (snapid),
4048 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4050 snap_name = ERR_PTR(ret);
4055 end = reply_buf + ret;
4056 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4057 if (IS_ERR(snap_name))
4060 dout(" snap_id 0x%016llx snap_name = %s\n",
4061 (unsigned long long)snap_id, snap_name);
4068 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev,
4069 u64 snap_id, u64 *snap_size, u64 *snap_features)
4073 const char *snap_name;
4076 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
4080 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
4084 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4085 if (!IS_ERR(snap_name)) {
4087 *snap_features = features;
4092 return ERR_PTR(ret);
4095 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev,
4096 u64 snap_id, u64 *snap_size, u64 *snap_features)
4098 if (rbd_dev->image_format == 1)
4099 return rbd_dev_v1_snap_info(rbd_dev, snap_id,
4100 snap_size, snap_features);
4101 if (rbd_dev->image_format == 2)
4102 return rbd_dev_v2_snap_info(rbd_dev, snap_id,
4103 snap_size, snap_features);
4104 return ERR_PTR(-EINVAL);
4107 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4111 down_write(&rbd_dev->header_rwsem);
4113 ret = rbd_dev_v2_image_size(rbd_dev);
4116 rbd_update_mapping_size(rbd_dev);
4118 ret = rbd_dev_v2_snap_context(rbd_dev);
4119 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4122 ret = rbd_dev_snaps_update(rbd_dev);
4123 dout("rbd_dev_snaps_update returned %d\n", ret);
4127 up_write(&rbd_dev->header_rwsem);
4133 * Scan the rbd device's current snapshot list and compare it to the
4134 * newly-received snapshot context. Remove any existing snapshots
4135 * not present in the new snapshot context. Add a new snapshot for
4136 * any snaphots in the snapshot context not in the current list.
4137 * And verify there are no changes to snapshots we already know
4140 * Assumes the snapshots in the snapshot context are sorted by
4141 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4142 * are also maintained in that order.)
4144 * Note that any error occurs while updating the snapshot list
4145 * aborts the update, and the entire list is cleared. The snapshot
4146 * list becomes inconsistent at that point anyway, so it might as
4149 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4151 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4152 const u32 snap_count = snapc->num_snaps;
4153 struct list_head *head = &rbd_dev->snaps;
4154 struct list_head *links = head->next;
4158 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4159 while (index < snap_count || links != head) {
4161 struct rbd_snap *snap;
4162 const char *snap_name;
4164 u64 snap_features = 0;
4166 snap_id = index < snap_count ? snapc->snaps[index]
4168 snap = links != head ? list_entry(links, struct rbd_snap, node)
4170 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4172 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4173 struct list_head *next = links->next;
4176 * A previously-existing snapshot is not in
4177 * the new snap context.
4179 * If the now-missing snapshot is the one
4180 * the image represents, clear its existence
4181 * flag so we can avoid sending any more
4184 if (rbd_dev->spec->snap_id == snap->id)
4185 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4186 dout("removing %ssnap id %llu\n",
4187 rbd_dev->spec->snap_id == snap->id ?
4189 (unsigned long long)snap->id);
4191 list_del(&snap->node);
4192 rbd_snap_destroy(snap);
4194 /* Done with this list entry; advance */
4200 snap_name = rbd_dev_snap_info(rbd_dev, snap_id,
4201 &snap_size, &snap_features);
4202 if (IS_ERR(snap_name)) {
4203 ret = PTR_ERR(snap_name);
4204 dout("failed to get snap info, error %d\n", ret);
4208 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4209 (unsigned long long)snap_id);
4210 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4211 struct rbd_snap *new_snap;
4213 /* We haven't seen this snapshot before */
4215 new_snap = rbd_snap_create(rbd_dev, snap_name,
4216 snap_id, snap_size, snap_features);
4217 if (IS_ERR(new_snap)) {
4218 ret = PTR_ERR(new_snap);
4219 dout(" failed to add dev, error %d\n", ret);
4223 /* New goes before existing, or at end of list */
4225 dout(" added dev%s\n", snap ? "" : " at end\n");
4227 list_add_tail(&new_snap->node, &snap->node);
4229 list_add_tail(&new_snap->node, head);
4231 /* Already have this one */
4233 dout(" already present\n");
4235 rbd_assert(snap->size == snap_size);
4236 rbd_assert(!strcmp(snap->name, snap_name));
4237 rbd_assert(snap->features == snap_features);
4239 /* Done with this list entry; advance */
4241 links = links->next;
4244 /* Advance to the next entry in the snapshot context */
4248 dout("%s: done\n", __func__);
4252 rbd_remove_all_snaps(rbd_dev);
4257 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4262 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4264 dev = &rbd_dev->dev;
4265 dev->bus = &rbd_bus_type;
4266 dev->type = &rbd_device_type;
4267 dev->parent = &rbd_root_dev;
4268 dev->release = rbd_dev_device_release;
4269 dev_set_name(dev, "%d", rbd_dev->dev_id);
4270 ret = device_register(dev);
4272 mutex_unlock(&ctl_mutex);
4277 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4279 device_unregister(&rbd_dev->dev);
4282 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4285 * Get a unique rbd identifier for the given new rbd_dev, and add
4286 * the rbd_dev to the global list. The minimum rbd id is 1.
4288 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4290 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4292 spin_lock(&rbd_dev_list_lock);
4293 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4294 spin_unlock(&rbd_dev_list_lock);
4295 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4296 (unsigned long long) rbd_dev->dev_id);
4300 * Remove an rbd_dev from the global list, and record that its
4301 * identifier is no longer in use.
4303 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4305 struct list_head *tmp;
4306 int rbd_id = rbd_dev->dev_id;
4309 rbd_assert(rbd_id > 0);
4311 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4312 (unsigned long long) rbd_dev->dev_id);
4313 spin_lock(&rbd_dev_list_lock);
4314 list_del_init(&rbd_dev->node);
4317 * If the id being "put" is not the current maximum, there
4318 * is nothing special we need to do.
4320 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4321 spin_unlock(&rbd_dev_list_lock);
4326 * We need to update the current maximum id. Search the
4327 * list to find out what it is. We're more likely to find
4328 * the maximum at the end, so search the list backward.
4331 list_for_each_prev(tmp, &rbd_dev_list) {
4332 struct rbd_device *rbd_dev;
4334 rbd_dev = list_entry(tmp, struct rbd_device, node);
4335 if (rbd_dev->dev_id > max_id)
4336 max_id = rbd_dev->dev_id;
4338 spin_unlock(&rbd_dev_list_lock);
4341 * The max id could have been updated by rbd_dev_id_get(), in
4342 * which case it now accurately reflects the new maximum.
4343 * Be careful not to overwrite the maximum value in that
4346 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4347 dout(" max dev id has been reset\n");
4351 * Skips over white space at *buf, and updates *buf to point to the
4352 * first found non-space character (if any). Returns the length of
4353 * the token (string of non-white space characters) found. Note
4354 * that *buf must be terminated with '\0'.
4356 static inline size_t next_token(const char **buf)
4359 * These are the characters that produce nonzero for
4360 * isspace() in the "C" and "POSIX" locales.
4362 const char *spaces = " \f\n\r\t\v";
4364 *buf += strspn(*buf, spaces); /* Find start of token */
4366 return strcspn(*buf, spaces); /* Return token length */
4370 * Finds the next token in *buf, and if the provided token buffer is
4371 * big enough, copies the found token into it. The result, if
4372 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4373 * must be terminated with '\0' on entry.
4375 * Returns the length of the token found (not including the '\0').
4376 * Return value will be 0 if no token is found, and it will be >=
4377 * token_size if the token would not fit.
4379 * The *buf pointer will be updated to point beyond the end of the
4380 * found token. Note that this occurs even if the token buffer is
4381 * too small to hold it.
4383 static inline size_t copy_token(const char **buf,
4389 len = next_token(buf);
4390 if (len < token_size) {
4391 memcpy(token, *buf, len);
4392 *(token + len) = '\0';
4400 * Finds the next token in *buf, dynamically allocates a buffer big
4401 * enough to hold a copy of it, and copies the token into the new
4402 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4403 * that a duplicate buffer is created even for a zero-length token.
4405 * Returns a pointer to the newly-allocated duplicate, or a null
4406 * pointer if memory for the duplicate was not available. If
4407 * the lenp argument is a non-null pointer, the length of the token
4408 * (not including the '\0') is returned in *lenp.
4410 * If successful, the *buf pointer will be updated to point beyond
4411 * the end of the found token.
4413 * Note: uses GFP_KERNEL for allocation.
4415 static inline char *dup_token(const char **buf, size_t *lenp)
4420 len = next_token(buf);
4421 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4424 *(dup + len) = '\0';
4434 * Parse the options provided for an "rbd add" (i.e., rbd image
4435 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4436 * and the data written is passed here via a NUL-terminated buffer.
4437 * Returns 0 if successful or an error code otherwise.
4439 * The information extracted from these options is recorded in
4440 * the other parameters which return dynamically-allocated
4443 * The address of a pointer that will refer to a ceph options
4444 * structure. Caller must release the returned pointer using
4445 * ceph_destroy_options() when it is no longer needed.
4447 * Address of an rbd options pointer. Fully initialized by
4448 * this function; caller must release with kfree().
4450 * Address of an rbd image specification pointer. Fully
4451 * initialized by this function based on parsed options.
4452 * Caller must release with rbd_spec_put().
4454 * The options passed take this form:
4455 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4458 * A comma-separated list of one or more monitor addresses.
4459 * A monitor address is an ip address, optionally followed
4460 * by a port number (separated by a colon).
4461 * I.e.: ip1[:port1][,ip2[:port2]...]
4463 * A comma-separated list of ceph and/or rbd options.
4465 * The name of the rados pool containing the rbd image.
4467 * The name of the image in that pool to map.
4469 * An optional snapshot id. If provided, the mapping will
4470 * present data from the image at the time that snapshot was
4471 * created. The image head is used if no snapshot id is
4472 * provided. Snapshot mappings are always read-only.
4474 static int rbd_add_parse_args(const char *buf,
4475 struct ceph_options **ceph_opts,
4476 struct rbd_options **opts,
4477 struct rbd_spec **rbd_spec)
4481 const char *mon_addrs;
4483 size_t mon_addrs_size;
4484 struct rbd_spec *spec = NULL;
4485 struct rbd_options *rbd_opts = NULL;
4486 struct ceph_options *copts;
4489 /* The first four tokens are required */
4491 len = next_token(&buf);
4493 rbd_warn(NULL, "no monitor address(es) provided");
4497 mon_addrs_size = len + 1;
4501 options = dup_token(&buf, NULL);
4505 rbd_warn(NULL, "no options provided");
4509 spec = rbd_spec_alloc();
4513 spec->pool_name = dup_token(&buf, NULL);
4514 if (!spec->pool_name)
4516 if (!*spec->pool_name) {
4517 rbd_warn(NULL, "no pool name provided");
4521 spec->image_name = dup_token(&buf, NULL);
4522 if (!spec->image_name)
4524 if (!*spec->image_name) {
4525 rbd_warn(NULL, "no image name provided");
4530 * Snapshot name is optional; default is to use "-"
4531 * (indicating the head/no snapshot).
4533 len = next_token(&buf);
4535 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4536 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4537 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4538 ret = -ENAMETOOLONG;
4541 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4544 *(snap_name + len) = '\0';
4545 spec->snap_name = snap_name;
4547 /* Initialize all rbd options to the defaults */
4549 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4553 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4555 copts = ceph_parse_options(options, mon_addrs,
4556 mon_addrs + mon_addrs_size - 1,
4557 parse_rbd_opts_token, rbd_opts);
4558 if (IS_ERR(copts)) {
4559 ret = PTR_ERR(copts);
4580 * An rbd format 2 image has a unique identifier, distinct from the
4581 * name given to it by the user. Internally, that identifier is
4582 * what's used to specify the names of objects related to the image.
4584 * A special "rbd id" object is used to map an rbd image name to its
4585 * id. If that object doesn't exist, then there is no v2 rbd image
4586 * with the supplied name.
4588 * This function will record the given rbd_dev's image_id field if
4589 * it can be determined, and in that case will return 0. If any
4590 * errors occur a negative errno will be returned and the rbd_dev's
4591 * image_id field will be unchanged (and should be NULL).
4593 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4602 * When probing a parent image, the image id is already
4603 * known (and the image name likely is not). There's no
4604 * need to fetch the image id again in this case. We
4605 * do still need to set the image format though.
4607 if (rbd_dev->spec->image_id) {
4608 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4614 * First, see if the format 2 image id file exists, and if
4615 * so, get the image's persistent id from it.
4617 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4618 object_name = kmalloc(size, GFP_NOIO);
4621 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4622 dout("rbd id object name is %s\n", object_name);
4624 /* Response will be an encoded string, which includes a length */
4626 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4627 response = kzalloc(size, GFP_NOIO);
4633 /* If it doesn't exist we'll assume it's a format 1 image */
4635 ret = rbd_obj_method_sync(rbd_dev, object_name,
4636 "rbd", "get_id", NULL, 0,
4637 response, RBD_IMAGE_ID_LEN_MAX);
4638 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4639 if (ret == -ENOENT) {
4640 image_id = kstrdup("", GFP_KERNEL);
4641 ret = image_id ? 0 : -ENOMEM;
4643 rbd_dev->image_format = 1;
4644 } else if (ret > sizeof (__le32)) {
4647 image_id = ceph_extract_encoded_string(&p, p + ret,
4649 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4651 rbd_dev->image_format = 2;
4657 rbd_dev->spec->image_id = image_id;
4658 dout("image_id is %s\n", image_id);
4667 /* Undo whatever state changes are made by v1 or v2 image probe */
4669 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4671 struct rbd_image_header *header;
4673 rbd_dev_remove_parent(rbd_dev);
4674 rbd_spec_put(rbd_dev->parent_spec);
4675 rbd_dev->parent_spec = NULL;
4676 rbd_dev->parent_overlap = 0;
4678 /* Free dynamic fields from the header, then zero it out */
4680 header = &rbd_dev->header;
4681 ceph_put_snap_context(header->snapc);
4682 kfree(header->snap_sizes);
4683 kfree(header->snap_names);
4684 kfree(header->object_prefix);
4685 memset(header, 0, sizeof (*header));
4688 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4692 /* Populate rbd image metadata */
4694 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4698 /* Version 1 images have no parent (no layering) */
4700 rbd_dev->parent_spec = NULL;
4701 rbd_dev->parent_overlap = 0;
4703 dout("discovered version 1 image, header name is %s\n",
4704 rbd_dev->header_name);
4709 kfree(rbd_dev->header_name);
4710 rbd_dev->header_name = NULL;
4711 kfree(rbd_dev->spec->image_id);
4712 rbd_dev->spec->image_id = NULL;
4717 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4721 ret = rbd_dev_v2_image_size(rbd_dev);
4725 /* Get the object prefix (a.k.a. block_name) for the image */
4727 ret = rbd_dev_v2_object_prefix(rbd_dev);
4731 /* Get the and check features for the image */
4733 ret = rbd_dev_v2_features(rbd_dev);
4737 /* If the image supports layering, get the parent info */
4739 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4740 ret = rbd_dev_v2_parent_info(rbd_dev);
4745 * Don't print a warning for parent images. We can
4746 * tell this point because we won't know its pool
4747 * name yet (just its pool id).
4749 if (rbd_dev->spec->pool_name)
4750 rbd_warn(rbd_dev, "WARNING: kernel layering "
4751 "is EXPERIMENTAL!");
4754 /* If the image supports fancy striping, get its parameters */
4756 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4757 ret = rbd_dev_v2_striping_info(rbd_dev);
4762 /* crypto and compression type aren't (yet) supported for v2 images */
4764 rbd_dev->header.crypt_type = 0;
4765 rbd_dev->header.comp_type = 0;
4767 /* Get the snapshot context, plus the header version */
4769 ret = rbd_dev_v2_snap_context(rbd_dev);
4773 dout("discovered version 2 image, header name is %s\n",
4774 rbd_dev->header_name);
4778 rbd_dev->parent_overlap = 0;
4779 rbd_spec_put(rbd_dev->parent_spec);
4780 rbd_dev->parent_spec = NULL;
4781 kfree(rbd_dev->header_name);
4782 rbd_dev->header_name = NULL;
4783 kfree(rbd_dev->header.object_prefix);
4784 rbd_dev->header.object_prefix = NULL;
4789 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4791 struct rbd_device *parent = NULL;
4792 struct rbd_spec *parent_spec;
4793 struct rbd_client *rbdc;
4796 if (!rbd_dev->parent_spec)
4799 * We need to pass a reference to the client and the parent
4800 * spec when creating the parent rbd_dev. Images related by
4801 * parent/child relationships always share both.
4803 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4804 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4807 parent = rbd_dev_create(rbdc, parent_spec);
4811 ret = rbd_dev_image_probe(parent);
4814 rbd_dev->parent = parent;
4819 rbd_spec_put(rbd_dev->parent_spec);
4820 kfree(rbd_dev->header_name);
4821 rbd_dev_destroy(parent);
4823 rbd_put_client(rbdc);
4824 rbd_spec_put(parent_spec);
4830 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4834 ret = rbd_dev_mapping_set(rbd_dev);
4838 /* generate unique id: find highest unique id, add one */
4839 rbd_dev_id_get(rbd_dev);
4841 /* Fill in the device name, now that we have its id. */
4842 BUILD_BUG_ON(DEV_NAME_LEN
4843 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4844 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4846 /* Get our block major device number. */
4848 ret = register_blkdev(0, rbd_dev->name);
4851 rbd_dev->major = ret;
4853 /* Set up the blkdev mapping. */
4855 ret = rbd_init_disk(rbd_dev);
4857 goto err_out_blkdev;
4859 ret = rbd_bus_add_dev(rbd_dev);
4863 /* Everything's ready. Announce the disk to the world. */
4865 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4866 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4867 add_disk(rbd_dev->disk);
4869 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4870 (unsigned long long) rbd_dev->mapping.size);
4875 rbd_free_disk(rbd_dev);
4877 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4879 rbd_dev_id_put(rbd_dev);
4880 rbd_dev_mapping_clear(rbd_dev);
4885 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4887 struct rbd_spec *spec = rbd_dev->spec;
4890 /* Record the header object name for this rbd image. */
4892 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4894 if (rbd_dev->image_format == 1)
4895 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4897 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4899 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4900 if (!rbd_dev->header_name)
4903 if (rbd_dev->image_format == 1)
4904 sprintf(rbd_dev->header_name, "%s%s",
4905 spec->image_name, RBD_SUFFIX);
4907 sprintf(rbd_dev->header_name, "%s%s",
4908 RBD_HEADER_PREFIX, spec->image_id);
4912 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4916 rbd_remove_all_snaps(rbd_dev);
4917 rbd_dev_unprobe(rbd_dev);
4918 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4920 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4921 kfree(rbd_dev->header_name);
4922 rbd_dev->header_name = NULL;
4923 rbd_dev->image_format = 0;
4924 kfree(rbd_dev->spec->image_id);
4925 rbd_dev->spec->image_id = NULL;
4927 rbd_dev_destroy(rbd_dev);
4931 * Probe for the existence of the header object for the given rbd
4932 * device. For format 2 images this includes determining the image
4935 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4941 * Get the id from the image id object. If it's not a
4942 * format 2 image, we'll get ENOENT back, and we'll assume
4943 * it's a format 1 image.
4945 ret = rbd_dev_image_id(rbd_dev);
4948 rbd_assert(rbd_dev->spec->image_id);
4949 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4951 ret = rbd_dev_header_name(rbd_dev);
4953 goto err_out_format;
4955 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4957 goto out_header_name;
4959 if (rbd_dev->image_format == 1)
4960 ret = rbd_dev_v1_probe(rbd_dev);
4962 ret = rbd_dev_v2_probe(rbd_dev);
4966 ret = rbd_dev_snaps_update(rbd_dev);
4970 ret = rbd_dev_spec_update(rbd_dev);
4974 ret = rbd_dev_probe_parent(rbd_dev);
4979 rbd_remove_all_snaps(rbd_dev);
4981 rbd_dev_unprobe(rbd_dev);
4983 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4985 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4987 kfree(rbd_dev->header_name);
4988 rbd_dev->header_name = NULL;
4990 rbd_dev->image_format = 0;
4991 kfree(rbd_dev->spec->image_id);
4992 rbd_dev->spec->image_id = NULL;
4994 dout("probe failed, returning %d\n", ret);
4999 static ssize_t rbd_add(struct bus_type *bus,
5003 struct rbd_device *rbd_dev = NULL;
5004 struct ceph_options *ceph_opts = NULL;
5005 struct rbd_options *rbd_opts = NULL;
5006 struct rbd_spec *spec = NULL;
5007 struct rbd_client *rbdc;
5008 struct ceph_osd_client *osdc;
5011 if (!try_module_get(THIS_MODULE))
5014 /* parse add command */
5015 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5017 goto err_out_module;
5019 rbdc = rbd_get_client(ceph_opts);
5024 ceph_opts = NULL; /* rbd_dev client now owns this */
5027 osdc = &rbdc->client->osdc;
5028 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5030 goto err_out_client;
5031 spec->pool_id = (u64)rc;
5033 /* The ceph file layout needs to fit pool id in 32 bits */
5035 if (spec->pool_id > (u64)U32_MAX) {
5036 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5037 (unsigned long long)spec->pool_id, U32_MAX);
5039 goto err_out_client;
5042 rbd_dev = rbd_dev_create(rbdc, spec);
5044 goto err_out_client;
5045 rbdc = NULL; /* rbd_dev now owns this */
5046 spec = NULL; /* rbd_dev now owns this */
5048 rbd_dev->mapping.read_only = rbd_opts->read_only;
5050 rbd_opts = NULL; /* done with this */
5052 rc = rbd_dev_image_probe(rbd_dev);
5054 goto err_out_rbd_dev;
5056 rc = rbd_dev_device_setup(rbd_dev);
5060 rbd_dev_image_release(rbd_dev);
5062 rbd_dev_destroy(rbd_dev);
5064 rbd_put_client(rbdc);
5067 ceph_destroy_options(ceph_opts);
5071 module_put(THIS_MODULE);
5073 dout("Error adding device %s\n", buf);
5078 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
5080 struct list_head *tmp;
5081 struct rbd_device *rbd_dev;
5083 spin_lock(&rbd_dev_list_lock);
5084 list_for_each(tmp, &rbd_dev_list) {
5085 rbd_dev = list_entry(tmp, struct rbd_device, node);
5086 if (rbd_dev->dev_id == dev_id) {
5087 spin_unlock(&rbd_dev_list_lock);
5091 spin_unlock(&rbd_dev_list_lock);
5095 static void rbd_dev_device_release(struct device *dev)
5097 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5099 rbd_free_disk(rbd_dev);
5100 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5101 rbd_dev_clear_mapping(rbd_dev);
5102 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5104 rbd_dev_id_put(rbd_dev);
5105 rbd_dev_mapping_clear(rbd_dev);
5108 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5110 while (rbd_dev->parent) {
5111 struct rbd_device *first = rbd_dev;
5112 struct rbd_device *second = first->parent;
5113 struct rbd_device *third;
5116 * Follow to the parent with no grandparent and
5119 while (second && (third = second->parent)) {
5124 rbd_dev_image_release(second);
5125 first->parent = NULL;
5126 first->parent_overlap = 0;
5128 rbd_assert(first->parent_spec);
5129 rbd_spec_put(first->parent_spec);
5130 first->parent_spec = NULL;
5134 static ssize_t rbd_remove(struct bus_type *bus,
5138 struct rbd_device *rbd_dev = NULL;
5143 ret = strict_strtoul(buf, 10, &ul);
5147 /* convert to int; abort if we lost anything in the conversion */
5148 target_id = (int) ul;
5149 if (target_id != ul)
5152 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5154 rbd_dev = __rbd_get_dev(target_id);
5160 spin_lock_irq(&rbd_dev->lock);
5161 if (rbd_dev->open_count)
5164 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5165 spin_unlock_irq(&rbd_dev->lock);
5169 rbd_bus_del_dev(rbd_dev);
5170 rbd_dev_image_release(rbd_dev);
5171 module_put(THIS_MODULE);
5173 mutex_unlock(&ctl_mutex);
5179 * create control files in sysfs
5182 static int rbd_sysfs_init(void)
5186 ret = device_register(&rbd_root_dev);
5190 ret = bus_register(&rbd_bus_type);
5192 device_unregister(&rbd_root_dev);
5197 static void rbd_sysfs_cleanup(void)
5199 bus_unregister(&rbd_bus_type);
5200 device_unregister(&rbd_root_dev);
5203 static int __init rbd_init(void)
5207 if (!libceph_compatible(NULL)) {
5208 rbd_warn(NULL, "libceph incompatibility (quitting)");
5212 rc = rbd_sysfs_init();
5215 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5219 static void __exit rbd_exit(void)
5221 rbd_sysfs_cleanup();
5224 module_init(rbd_init);
5225 module_exit(rbd_exit);
5227 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5228 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5229 MODULE_DESCRIPTION("rados block device");
5231 /* following authorship retained from original osdblk.c */
5232 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5234 MODULE_LICENSE("GPL");