3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44 #include <linux/idr.h>
45 #include <linux/workqueue.h>
47 #include "rbd_types.h"
49 #define RBD_DEBUG /* Activate rbd_assert() calls */
52 * The basic unit of block I/O is a sector. It is interpreted in a
53 * number of contexts in Linux (blk, bio, genhd), but the default is
54 * universally 512 bytes. These symbols are just slightly more
55 * meaningful than the bare numbers they represent.
57 #define SECTOR_SHIFT 9
58 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61 * Increment the given counter and return its updated value.
62 * If the counter is already 0 it will not be incremented.
63 * If the counter is already at its maximum value returns
64 * -EINVAL without updating it.
66 static int atomic_inc_return_safe(atomic_t *v)
70 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
71 if (counter <= (unsigned int)INT_MAX)
79 /* Decrement the counter. Return the resulting value, or -EINVAL */
80 static int atomic_dec_return_safe(atomic_t *v)
84 counter = atomic_dec_return(v);
93 #define RBD_DRV_NAME "rbd"
95 #define RBD_MINORS_PER_MAJOR 256
96 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
98 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
99 #define RBD_MAX_SNAP_NAME_LEN \
100 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
102 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
104 #define RBD_SNAP_HEAD_NAME "-"
106 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
108 /* This allows a single page to hold an image name sent by OSD */
109 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
110 #define RBD_IMAGE_ID_LEN_MAX 64
112 #define RBD_OBJ_PREFIX_LEN_MAX 64
116 #define RBD_FEATURE_LAYERING (1<<0)
117 #define RBD_FEATURE_STRIPINGV2 (1<<1)
118 #define RBD_FEATURES_ALL \
119 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
121 /* Features supported by this (client software) implementation. */
123 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
126 * An RBD device name will be "rbd#", where the "rbd" comes from
127 * RBD_DRV_NAME above, and # is a unique integer identifier.
128 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
129 * enough to hold all possible device names.
131 #define DEV_NAME_LEN 32
132 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
135 * block device image metadata (in-memory version)
137 struct rbd_image_header {
138 /* These six fields never change for a given rbd image */
145 u64 features; /* Might be changeable someday? */
147 /* The remaining fields need to be updated occasionally */
149 struct ceph_snap_context *snapc;
150 char *snap_names; /* format 1 only */
151 u64 *snap_sizes; /* format 1 only */
155 * An rbd image specification.
157 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
158 * identify an image. Each rbd_dev structure includes a pointer to
159 * an rbd_spec structure that encapsulates this identity.
161 * Each of the id's in an rbd_spec has an associated name. For a
162 * user-mapped image, the names are supplied and the id's associated
163 * with them are looked up. For a layered image, a parent image is
164 * defined by the tuple, and the names are looked up.
166 * An rbd_dev structure contains a parent_spec pointer which is
167 * non-null if the image it represents is a child in a layered
168 * image. This pointer will refer to the rbd_spec structure used
169 * by the parent rbd_dev for its own identity (i.e., the structure
170 * is shared between the parent and child).
172 * Since these structures are populated once, during the discovery
173 * phase of image construction, they are effectively immutable so
174 * we make no effort to synchronize access to them.
176 * Note that code herein does not assume the image name is known (it
177 * could be a null pointer).
181 const char *pool_name;
183 const char *image_id;
184 const char *image_name;
187 const char *snap_name;
193 * an instance of the client. multiple devices may share an rbd client.
196 struct ceph_client *client;
198 struct list_head node;
201 struct rbd_img_request;
202 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
204 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
206 struct rbd_obj_request;
207 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
209 enum obj_request_type {
210 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
213 enum obj_operation_type {
219 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
220 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
221 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
222 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
225 struct rbd_obj_request {
226 const char *object_name;
227 u64 offset; /* object start byte */
228 u64 length; /* bytes from offset */
232 * An object request associated with an image will have its
233 * img_data flag set; a standalone object request will not.
235 * A standalone object request will have which == BAD_WHICH
236 * and a null obj_request pointer.
238 * An object request initiated in support of a layered image
239 * object (to check for its existence before a write) will
240 * have which == BAD_WHICH and a non-null obj_request pointer.
242 * Finally, an object request for rbd image data will have
243 * which != BAD_WHICH, and will have a non-null img_request
244 * pointer. The value of which will be in the range
245 * 0..(img_request->obj_request_count-1).
248 struct rbd_obj_request *obj_request; /* STAT op */
250 struct rbd_img_request *img_request;
252 /* links for img_request->obj_requests list */
253 struct list_head links;
256 u32 which; /* posn image request list */
258 enum obj_request_type type;
260 struct bio *bio_list;
266 struct page **copyup_pages;
267 u32 copyup_page_count;
269 struct ceph_osd_request *osd_req;
271 u64 xferred; /* bytes transferred */
274 rbd_obj_callback_t callback;
275 struct completion completion;
281 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
282 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
283 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
286 struct rbd_img_request {
287 struct rbd_device *rbd_dev;
288 u64 offset; /* starting image byte offset */
289 u64 length; /* byte count from offset */
292 u64 snap_id; /* for reads */
293 struct ceph_snap_context *snapc; /* for writes */
296 struct request *rq; /* block request */
297 struct rbd_obj_request *obj_request; /* obj req initiator */
299 struct page **copyup_pages;
300 u32 copyup_page_count;
301 spinlock_t completion_lock;/* protects next_completion */
303 rbd_img_callback_t callback;
304 u64 xferred;/* aggregate bytes transferred */
305 int result; /* first nonzero obj_request result */
307 u32 obj_request_count;
308 struct list_head obj_requests; /* rbd_obj_request structs */
313 #define for_each_obj_request(ireq, oreq) \
314 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
315 #define for_each_obj_request_from(ireq, oreq) \
316 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
317 #define for_each_obj_request_safe(ireq, oreq, n) \
318 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
330 int dev_id; /* blkdev unique id */
332 int major; /* blkdev assigned major */
334 struct gendisk *disk; /* blkdev's gendisk and rq */
336 u32 image_format; /* Either 1 or 2 */
337 struct rbd_client *rbd_client;
339 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
341 struct list_head rq_queue; /* incoming rq queue */
342 spinlock_t lock; /* queue, flags, open_count */
343 struct workqueue_struct *rq_wq;
344 struct work_struct rq_work;
346 struct rbd_image_header header;
347 unsigned long flags; /* possibly lock protected */
348 struct rbd_spec *spec;
352 struct ceph_file_layout layout;
354 struct ceph_osd_event *watch_event;
355 struct rbd_obj_request *watch_request;
357 struct rbd_spec *parent_spec;
360 struct rbd_device *parent;
362 /* protects updating the header */
363 struct rw_semaphore header_rwsem;
365 struct rbd_mapping mapping;
367 struct list_head node;
371 unsigned long open_count; /* protected by lock */
375 * Flag bits for rbd_dev->flags. If atomicity is required,
376 * rbd_dev->lock is used to protect access.
378 * Currently, only the "removing" flag (which is coupled with the
379 * "open_count" field) requires atomic access.
382 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
383 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
386 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
388 static LIST_HEAD(rbd_dev_list); /* devices */
389 static DEFINE_SPINLOCK(rbd_dev_list_lock);
391 static LIST_HEAD(rbd_client_list); /* clients */
392 static DEFINE_SPINLOCK(rbd_client_list_lock);
394 /* Slab caches for frequently-allocated structures */
396 static struct kmem_cache *rbd_img_request_cache;
397 static struct kmem_cache *rbd_obj_request_cache;
398 static struct kmem_cache *rbd_segment_name_cache;
400 static int rbd_major;
401 static DEFINE_IDA(rbd_dev_id_ida);
404 * Default to false for now, as single-major requires >= 0.75 version of
405 * userspace rbd utility.
407 static bool single_major = false;
408 module_param(single_major, bool, S_IRUGO);
409 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
411 static int rbd_img_request_submit(struct rbd_img_request *img_request);
413 static void rbd_dev_device_release(struct device *dev);
415 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
417 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
419 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
421 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
423 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
424 static void rbd_spec_put(struct rbd_spec *spec);
426 static int rbd_dev_id_to_minor(int dev_id)
428 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
431 static int minor_to_rbd_dev_id(int minor)
433 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
436 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
437 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
438 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
439 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
441 static struct attribute *rbd_bus_attrs[] = {
443 &bus_attr_remove.attr,
444 &bus_attr_add_single_major.attr,
445 &bus_attr_remove_single_major.attr,
449 static umode_t rbd_bus_is_visible(struct kobject *kobj,
450 struct attribute *attr, int index)
453 (attr == &bus_attr_add_single_major.attr ||
454 attr == &bus_attr_remove_single_major.attr))
460 static const struct attribute_group rbd_bus_group = {
461 .attrs = rbd_bus_attrs,
462 .is_visible = rbd_bus_is_visible,
464 __ATTRIBUTE_GROUPS(rbd_bus);
466 static struct bus_type rbd_bus_type = {
468 .bus_groups = rbd_bus_groups,
471 static void rbd_root_dev_release(struct device *dev)
475 static struct device rbd_root_dev = {
477 .release = rbd_root_dev_release,
480 static __printf(2, 3)
481 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
483 struct va_format vaf;
491 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
492 else if (rbd_dev->disk)
493 printk(KERN_WARNING "%s: %s: %pV\n",
494 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
495 else if (rbd_dev->spec && rbd_dev->spec->image_name)
496 printk(KERN_WARNING "%s: image %s: %pV\n",
497 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
498 else if (rbd_dev->spec && rbd_dev->spec->image_id)
499 printk(KERN_WARNING "%s: id %s: %pV\n",
500 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
502 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
503 RBD_DRV_NAME, rbd_dev, &vaf);
508 #define rbd_assert(expr) \
509 if (unlikely(!(expr))) { \
510 printk(KERN_ERR "\nAssertion failure in %s() " \
512 "\trbd_assert(%s);\n\n", \
513 __func__, __LINE__, #expr); \
516 #else /* !RBD_DEBUG */
517 # define rbd_assert(expr) ((void) 0)
518 #endif /* !RBD_DEBUG */
520 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
521 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
522 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
524 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
525 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
526 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
527 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
528 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
530 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
531 u8 *order, u64 *snap_size);
532 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
534 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
536 static int rbd_open(struct block_device *bdev, fmode_t mode)
538 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
539 bool removing = false;
541 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
544 spin_lock_irq(&rbd_dev->lock);
545 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
548 rbd_dev->open_count++;
549 spin_unlock_irq(&rbd_dev->lock);
553 (void) get_device(&rbd_dev->dev);
558 static void rbd_release(struct gendisk *disk, fmode_t mode)
560 struct rbd_device *rbd_dev = disk->private_data;
561 unsigned long open_count_before;
563 spin_lock_irq(&rbd_dev->lock);
564 open_count_before = rbd_dev->open_count--;
565 spin_unlock_irq(&rbd_dev->lock);
566 rbd_assert(open_count_before > 0);
568 put_device(&rbd_dev->dev);
571 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
576 bool ro_changed = false;
578 /* get_user() may sleep, so call it before taking rbd_dev->lock */
579 if (get_user(val, (int __user *)(arg)))
582 ro = val ? true : false;
583 /* Snapshot doesn't allow to write*/
584 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
587 spin_lock_irq(&rbd_dev->lock);
588 /* prevent others open this device */
589 if (rbd_dev->open_count > 1) {
594 if (rbd_dev->mapping.read_only != ro) {
595 rbd_dev->mapping.read_only = ro;
600 spin_unlock_irq(&rbd_dev->lock);
601 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
602 if (ret == 0 && ro_changed)
603 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
608 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
609 unsigned int cmd, unsigned long arg)
611 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
616 ret = rbd_ioctl_set_ro(rbd_dev, arg);
626 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
627 unsigned int cmd, unsigned long arg)
629 return rbd_ioctl(bdev, mode, cmd, arg);
631 #endif /* CONFIG_COMPAT */
633 static const struct block_device_operations rbd_bd_ops = {
634 .owner = THIS_MODULE,
636 .release = rbd_release,
639 .compat_ioctl = rbd_compat_ioctl,
644 * Initialize an rbd client instance. Success or not, this function
645 * consumes ceph_opts. Caller holds client_mutex.
647 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
649 struct rbd_client *rbdc;
652 dout("%s:\n", __func__);
653 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
657 kref_init(&rbdc->kref);
658 INIT_LIST_HEAD(&rbdc->node);
660 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
661 if (IS_ERR(rbdc->client))
663 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
665 ret = ceph_open_session(rbdc->client);
669 spin_lock(&rbd_client_list_lock);
670 list_add_tail(&rbdc->node, &rbd_client_list);
671 spin_unlock(&rbd_client_list_lock);
673 dout("%s: rbdc %p\n", __func__, rbdc);
677 ceph_destroy_client(rbdc->client);
682 ceph_destroy_options(ceph_opts);
683 dout("%s: error %d\n", __func__, ret);
688 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
690 kref_get(&rbdc->kref);
696 * Find a ceph client with specific addr and configuration. If
697 * found, bump its reference count.
699 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
701 struct rbd_client *client_node;
704 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
707 spin_lock(&rbd_client_list_lock);
708 list_for_each_entry(client_node, &rbd_client_list, node) {
709 if (!ceph_compare_options(ceph_opts, client_node->client)) {
710 __rbd_get_client(client_node);
716 spin_unlock(&rbd_client_list_lock);
718 return found ? client_node : NULL;
728 /* string args above */
731 /* Boolean args above */
735 static match_table_t rbd_opts_tokens = {
737 /* string args above */
738 {Opt_read_only, "read_only"},
739 {Opt_read_only, "ro"}, /* Alternate spelling */
740 {Opt_read_write, "read_write"},
741 {Opt_read_write, "rw"}, /* Alternate spelling */
742 /* Boolean args above */
750 #define RBD_READ_ONLY_DEFAULT false
752 static int parse_rbd_opts_token(char *c, void *private)
754 struct rbd_options *rbd_opts = private;
755 substring_t argstr[MAX_OPT_ARGS];
756 int token, intval, ret;
758 token = match_token(c, rbd_opts_tokens, argstr);
762 if (token < Opt_last_int) {
763 ret = match_int(&argstr[0], &intval);
765 pr_err("bad mount option arg (not int) "
769 dout("got int token %d val %d\n", token, intval);
770 } else if (token > Opt_last_int && token < Opt_last_string) {
771 dout("got string token %d val %s\n", token,
773 } else if (token > Opt_last_string && token < Opt_last_bool) {
774 dout("got Boolean token %d\n", token);
776 dout("got token %d\n", token);
781 rbd_opts->read_only = true;
784 rbd_opts->read_only = false;
793 static char* obj_op_name(enum obj_operation_type op_type)
806 * Get a ceph client with specific addr and configuration, if one does
807 * not exist create it. Either way, ceph_opts is consumed by this
810 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
812 struct rbd_client *rbdc;
814 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
815 rbdc = rbd_client_find(ceph_opts);
816 if (rbdc) /* using an existing client */
817 ceph_destroy_options(ceph_opts);
819 rbdc = rbd_client_create(ceph_opts);
820 mutex_unlock(&client_mutex);
826 * Destroy ceph client
828 * Caller must hold rbd_client_list_lock.
830 static void rbd_client_release(struct kref *kref)
832 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
834 dout("%s: rbdc %p\n", __func__, rbdc);
835 spin_lock(&rbd_client_list_lock);
836 list_del(&rbdc->node);
837 spin_unlock(&rbd_client_list_lock);
839 ceph_destroy_client(rbdc->client);
844 * Drop reference to ceph client node. If it's not referenced anymore, release
847 static void rbd_put_client(struct rbd_client *rbdc)
850 kref_put(&rbdc->kref, rbd_client_release);
853 static bool rbd_image_format_valid(u32 image_format)
855 return image_format == 1 || image_format == 2;
858 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
863 /* The header has to start with the magic rbd header text */
864 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
867 /* The bio layer requires at least sector-sized I/O */
869 if (ondisk->options.order < SECTOR_SHIFT)
872 /* If we use u64 in a few spots we may be able to loosen this */
874 if (ondisk->options.order > 8 * sizeof (int) - 1)
878 * The size of a snapshot header has to fit in a size_t, and
879 * that limits the number of snapshots.
881 snap_count = le32_to_cpu(ondisk->snap_count);
882 size = SIZE_MAX - sizeof (struct ceph_snap_context);
883 if (snap_count > size / sizeof (__le64))
887 * Not only that, but the size of the entire the snapshot
888 * header must also be representable in a size_t.
890 size -= snap_count * sizeof (__le64);
891 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
898 * Fill an rbd image header with information from the given format 1
901 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
902 struct rbd_image_header_ondisk *ondisk)
904 struct rbd_image_header *header = &rbd_dev->header;
905 bool first_time = header->object_prefix == NULL;
906 struct ceph_snap_context *snapc;
907 char *object_prefix = NULL;
908 char *snap_names = NULL;
909 u64 *snap_sizes = NULL;
915 /* Allocate this now to avoid having to handle failure below */
920 len = strnlen(ondisk->object_prefix,
921 sizeof (ondisk->object_prefix));
922 object_prefix = kmalloc(len + 1, GFP_KERNEL);
925 memcpy(object_prefix, ondisk->object_prefix, len);
926 object_prefix[len] = '\0';
929 /* Allocate the snapshot context and fill it in */
931 snap_count = le32_to_cpu(ondisk->snap_count);
932 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
935 snapc->seq = le64_to_cpu(ondisk->snap_seq);
937 struct rbd_image_snap_ondisk *snaps;
938 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
940 /* We'll keep a copy of the snapshot names... */
942 if (snap_names_len > (u64)SIZE_MAX)
944 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
948 /* ...as well as the array of their sizes. */
950 size = snap_count * sizeof (*header->snap_sizes);
951 snap_sizes = kmalloc(size, GFP_KERNEL);
956 * Copy the names, and fill in each snapshot's id
959 * Note that rbd_dev_v1_header_info() guarantees the
960 * ondisk buffer we're working with has
961 * snap_names_len bytes beyond the end of the
962 * snapshot id array, this memcpy() is safe.
964 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
965 snaps = ondisk->snaps;
966 for (i = 0; i < snap_count; i++) {
967 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
968 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
972 /* We won't fail any more, fill in the header */
975 header->object_prefix = object_prefix;
976 header->obj_order = ondisk->options.order;
977 header->crypt_type = ondisk->options.crypt_type;
978 header->comp_type = ondisk->options.comp_type;
979 /* The rest aren't used for format 1 images */
980 header->stripe_unit = 0;
981 header->stripe_count = 0;
982 header->features = 0;
984 ceph_put_snap_context(header->snapc);
985 kfree(header->snap_names);
986 kfree(header->snap_sizes);
989 /* The remaining fields always get updated (when we refresh) */
991 header->image_size = le64_to_cpu(ondisk->image_size);
992 header->snapc = snapc;
993 header->snap_names = snap_names;
994 header->snap_sizes = snap_sizes;
1002 ceph_put_snap_context(snapc);
1003 kfree(object_prefix);
1008 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1010 const char *snap_name;
1012 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1014 /* Skip over names until we find the one we are looking for */
1016 snap_name = rbd_dev->header.snap_names;
1018 snap_name += strlen(snap_name) + 1;
1020 return kstrdup(snap_name, GFP_KERNEL);
1024 * Snapshot id comparison function for use with qsort()/bsearch().
1025 * Note that result is for snapshots in *descending* order.
1027 static int snapid_compare_reverse(const void *s1, const void *s2)
1029 u64 snap_id1 = *(u64 *)s1;
1030 u64 snap_id2 = *(u64 *)s2;
1032 if (snap_id1 < snap_id2)
1034 return snap_id1 == snap_id2 ? 0 : -1;
1038 * Search a snapshot context to see if the given snapshot id is
1041 * Returns the position of the snapshot id in the array if it's found,
1042 * or BAD_SNAP_INDEX otherwise.
1044 * Note: The snapshot array is in kept sorted (by the osd) in
1045 * reverse order, highest snapshot id first.
1047 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1049 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1052 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1053 sizeof (snap_id), snapid_compare_reverse);
1055 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1058 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1062 const char *snap_name;
1064 which = rbd_dev_snap_index(rbd_dev, snap_id);
1065 if (which == BAD_SNAP_INDEX)
1066 return ERR_PTR(-ENOENT);
1068 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1069 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1072 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1074 if (snap_id == CEPH_NOSNAP)
1075 return RBD_SNAP_HEAD_NAME;
1077 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1078 if (rbd_dev->image_format == 1)
1079 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1081 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1084 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1087 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1088 if (snap_id == CEPH_NOSNAP) {
1089 *snap_size = rbd_dev->header.image_size;
1090 } else if (rbd_dev->image_format == 1) {
1093 which = rbd_dev_snap_index(rbd_dev, snap_id);
1094 if (which == BAD_SNAP_INDEX)
1097 *snap_size = rbd_dev->header.snap_sizes[which];
1102 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1111 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1114 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1115 if (snap_id == CEPH_NOSNAP) {
1116 *snap_features = rbd_dev->header.features;
1117 } else if (rbd_dev->image_format == 1) {
1118 *snap_features = 0; /* No features for format 1 */
1123 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1127 *snap_features = features;
1132 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1134 u64 snap_id = rbd_dev->spec->snap_id;
1139 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1142 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1146 rbd_dev->mapping.size = size;
1147 rbd_dev->mapping.features = features;
1152 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1154 rbd_dev->mapping.size = 0;
1155 rbd_dev->mapping.features = 0;
1158 static void rbd_segment_name_free(const char *name)
1160 /* The explicit cast here is needed to drop the const qualifier */
1162 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1165 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1172 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1175 segment = offset >> rbd_dev->header.obj_order;
1176 name_format = "%s.%012llx";
1177 if (rbd_dev->image_format == 2)
1178 name_format = "%s.%016llx";
1179 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1180 rbd_dev->header.object_prefix, segment);
1181 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1182 pr_err("error formatting segment name for #%llu (%d)\n",
1184 rbd_segment_name_free(name);
1191 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1193 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1195 return offset & (segment_size - 1);
1198 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1199 u64 offset, u64 length)
1201 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1203 offset &= segment_size - 1;
1205 rbd_assert(length <= U64_MAX - offset);
1206 if (offset + length > segment_size)
1207 length = segment_size - offset;
1213 * returns the size of an object in the image
1215 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1217 return 1 << header->obj_order;
1224 static void bio_chain_put(struct bio *chain)
1230 chain = chain->bi_next;
1236 * zeros a bio chain, starting at specific offset
1238 static void zero_bio_chain(struct bio *chain, int start_ofs)
1241 struct bvec_iter iter;
1242 unsigned long flags;
1247 bio_for_each_segment(bv, chain, iter) {
1248 if (pos + bv.bv_len > start_ofs) {
1249 int remainder = max(start_ofs - pos, 0);
1250 buf = bvec_kmap_irq(&bv, &flags);
1251 memset(buf + remainder, 0,
1252 bv.bv_len - remainder);
1253 flush_dcache_page(bv.bv_page);
1254 bvec_kunmap_irq(buf, &flags);
1259 chain = chain->bi_next;
1264 * similar to zero_bio_chain(), zeros data defined by a page array,
1265 * starting at the given byte offset from the start of the array and
1266 * continuing up to the given end offset. The pages array is
1267 * assumed to be big enough to hold all bytes up to the end.
1269 static void zero_pages(struct page **pages, u64 offset, u64 end)
1271 struct page **page = &pages[offset >> PAGE_SHIFT];
1273 rbd_assert(end > offset);
1274 rbd_assert(end - offset <= (u64)SIZE_MAX);
1275 while (offset < end) {
1278 unsigned long flags;
1281 page_offset = offset & ~PAGE_MASK;
1282 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1283 local_irq_save(flags);
1284 kaddr = kmap_atomic(*page);
1285 memset(kaddr + page_offset, 0, length);
1286 flush_dcache_page(*page);
1287 kunmap_atomic(kaddr);
1288 local_irq_restore(flags);
1296 * Clone a portion of a bio, starting at the given byte offset
1297 * and continuing for the number of bytes indicated.
1299 static struct bio *bio_clone_range(struct bio *bio_src,
1300 unsigned int offset,
1306 bio = bio_clone(bio_src, gfpmask);
1308 return NULL; /* ENOMEM */
1310 bio_advance(bio, offset);
1311 bio->bi_iter.bi_size = len;
1317 * Clone a portion of a bio chain, starting at the given byte offset
1318 * into the first bio in the source chain and continuing for the
1319 * number of bytes indicated. The result is another bio chain of
1320 * exactly the given length, or a null pointer on error.
1322 * The bio_src and offset parameters are both in-out. On entry they
1323 * refer to the first source bio and the offset into that bio where
1324 * the start of data to be cloned is located.
1326 * On return, bio_src is updated to refer to the bio in the source
1327 * chain that contains first un-cloned byte, and *offset will
1328 * contain the offset of that byte within that bio.
1330 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1331 unsigned int *offset,
1335 struct bio *bi = *bio_src;
1336 unsigned int off = *offset;
1337 struct bio *chain = NULL;
1340 /* Build up a chain of clone bios up to the limit */
1342 if (!bi || off >= bi->bi_iter.bi_size || !len)
1343 return NULL; /* Nothing to clone */
1347 unsigned int bi_size;
1351 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1352 goto out_err; /* EINVAL; ran out of bio's */
1354 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1355 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1357 goto out_err; /* ENOMEM */
1360 end = &bio->bi_next;
1363 if (off == bi->bi_iter.bi_size) {
1374 bio_chain_put(chain);
1380 * The default/initial value for all object request flags is 0. For
1381 * each flag, once its value is set to 1 it is never reset to 0
1384 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1386 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1387 struct rbd_device *rbd_dev;
1389 rbd_dev = obj_request->img_request->rbd_dev;
1390 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1395 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1398 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1401 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1403 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1404 struct rbd_device *rbd_dev = NULL;
1406 if (obj_request_img_data_test(obj_request))
1407 rbd_dev = obj_request->img_request->rbd_dev;
1408 rbd_warn(rbd_dev, "obj_request %p already marked done",
1413 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1416 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1420 * This sets the KNOWN flag after (possibly) setting the EXISTS
1421 * flag. The latter is set based on the "exists" value provided.
1423 * Note that for our purposes once an object exists it never goes
1424 * away again. It's possible that the response from two existence
1425 * checks are separated by the creation of the target object, and
1426 * the first ("doesn't exist") response arrives *after* the second
1427 * ("does exist"). In that case we ignore the second one.
1429 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1433 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1434 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1438 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1441 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1444 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1447 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1450 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1452 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1454 return obj_request->img_offset <
1455 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1458 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1460 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1461 atomic_read(&obj_request->kref.refcount));
1462 kref_get(&obj_request->kref);
1465 static void rbd_obj_request_destroy(struct kref *kref);
1466 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1468 rbd_assert(obj_request != NULL);
1469 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1470 atomic_read(&obj_request->kref.refcount));
1471 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1474 static void rbd_img_request_get(struct rbd_img_request *img_request)
1476 dout("%s: img %p (was %d)\n", __func__, img_request,
1477 atomic_read(&img_request->kref.refcount));
1478 kref_get(&img_request->kref);
1481 static bool img_request_child_test(struct rbd_img_request *img_request);
1482 static void rbd_parent_request_destroy(struct kref *kref);
1483 static void rbd_img_request_destroy(struct kref *kref);
1484 static void rbd_img_request_put(struct rbd_img_request *img_request)
1486 rbd_assert(img_request != NULL);
1487 dout("%s: img %p (was %d)\n", __func__, img_request,
1488 atomic_read(&img_request->kref.refcount));
1489 if (img_request_child_test(img_request))
1490 kref_put(&img_request->kref, rbd_parent_request_destroy);
1492 kref_put(&img_request->kref, rbd_img_request_destroy);
1495 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1496 struct rbd_obj_request *obj_request)
1498 rbd_assert(obj_request->img_request == NULL);
1500 /* Image request now owns object's original reference */
1501 obj_request->img_request = img_request;
1502 obj_request->which = img_request->obj_request_count;
1503 rbd_assert(!obj_request_img_data_test(obj_request));
1504 obj_request_img_data_set(obj_request);
1505 rbd_assert(obj_request->which != BAD_WHICH);
1506 img_request->obj_request_count++;
1507 list_add_tail(&obj_request->links, &img_request->obj_requests);
1508 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1509 obj_request->which);
1512 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1513 struct rbd_obj_request *obj_request)
1515 rbd_assert(obj_request->which != BAD_WHICH);
1517 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1518 obj_request->which);
1519 list_del(&obj_request->links);
1520 rbd_assert(img_request->obj_request_count > 0);
1521 img_request->obj_request_count--;
1522 rbd_assert(obj_request->which == img_request->obj_request_count);
1523 obj_request->which = BAD_WHICH;
1524 rbd_assert(obj_request_img_data_test(obj_request));
1525 rbd_assert(obj_request->img_request == img_request);
1526 obj_request->img_request = NULL;
1527 obj_request->callback = NULL;
1528 rbd_obj_request_put(obj_request);
1531 static bool obj_request_type_valid(enum obj_request_type type)
1534 case OBJ_REQUEST_NODATA:
1535 case OBJ_REQUEST_BIO:
1536 case OBJ_REQUEST_PAGES:
1543 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1544 struct rbd_obj_request *obj_request)
1546 dout("%s %p\n", __func__, obj_request);
1547 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1550 static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1552 dout("%s %p\n", __func__, obj_request);
1553 ceph_osdc_cancel_request(obj_request->osd_req);
1557 * Wait for an object request to complete. If interrupted, cancel the
1558 * underlying osd request.
1560 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1564 dout("%s %p\n", __func__, obj_request);
1566 ret = wait_for_completion_interruptible(&obj_request->completion);
1568 dout("%s %p interrupted\n", __func__, obj_request);
1569 rbd_obj_request_end(obj_request);
1573 dout("%s %p done\n", __func__, obj_request);
1577 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1580 dout("%s: img %p\n", __func__, img_request);
1583 * If no error occurred, compute the aggregate transfer
1584 * count for the image request. We could instead use
1585 * atomic64_cmpxchg() to update it as each object request
1586 * completes; not clear which way is better off hand.
1588 if (!img_request->result) {
1589 struct rbd_obj_request *obj_request;
1592 for_each_obj_request(img_request, obj_request)
1593 xferred += obj_request->xferred;
1594 img_request->xferred = xferred;
1597 if (img_request->callback)
1598 img_request->callback(img_request);
1600 rbd_img_request_put(img_request);
1604 * The default/initial value for all image request flags is 0. Each
1605 * is conditionally set to 1 at image request initialization time
1606 * and currently never change thereafter.
1608 static void img_request_write_set(struct rbd_img_request *img_request)
1610 set_bit(IMG_REQ_WRITE, &img_request->flags);
1614 static bool img_request_write_test(struct rbd_img_request *img_request)
1617 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1620 static void img_request_child_set(struct rbd_img_request *img_request)
1622 set_bit(IMG_REQ_CHILD, &img_request->flags);
1626 static void img_request_child_clear(struct rbd_img_request *img_request)
1628 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1632 static bool img_request_child_test(struct rbd_img_request *img_request)
1635 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1638 static void img_request_layered_set(struct rbd_img_request *img_request)
1640 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1644 static void img_request_layered_clear(struct rbd_img_request *img_request)
1646 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1650 static bool img_request_layered_test(struct rbd_img_request *img_request)
1653 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1657 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1659 u64 xferred = obj_request->xferred;
1660 u64 length = obj_request->length;
1662 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1663 obj_request, obj_request->img_request, obj_request->result,
1666 * ENOENT means a hole in the image. We zero-fill the entire
1667 * length of the request. A short read also implies zero-fill
1668 * to the end of the request. An error requires the whole
1669 * length of the request to be reported finished with an error
1670 * to the block layer. In each case we update the xferred
1671 * count to indicate the whole request was satisfied.
1673 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1674 if (obj_request->result == -ENOENT) {
1675 if (obj_request->type == OBJ_REQUEST_BIO)
1676 zero_bio_chain(obj_request->bio_list, 0);
1678 zero_pages(obj_request->pages, 0, length);
1679 obj_request->result = 0;
1680 } else if (xferred < length && !obj_request->result) {
1681 if (obj_request->type == OBJ_REQUEST_BIO)
1682 zero_bio_chain(obj_request->bio_list, xferred);
1684 zero_pages(obj_request->pages, xferred, length);
1686 obj_request->xferred = length;
1687 obj_request_done_set(obj_request);
1690 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1692 dout("%s: obj %p cb %p\n", __func__, obj_request,
1693 obj_request->callback);
1694 if (obj_request->callback)
1695 obj_request->callback(obj_request);
1697 complete_all(&obj_request->completion);
1700 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1702 dout("%s: obj %p\n", __func__, obj_request);
1703 obj_request_done_set(obj_request);
1706 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1708 struct rbd_img_request *img_request = NULL;
1709 struct rbd_device *rbd_dev = NULL;
1710 bool layered = false;
1712 if (obj_request_img_data_test(obj_request)) {
1713 img_request = obj_request->img_request;
1714 layered = img_request && img_request_layered_test(img_request);
1715 rbd_dev = img_request->rbd_dev;
1718 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1719 obj_request, img_request, obj_request->result,
1720 obj_request->xferred, obj_request->length);
1721 if (layered && obj_request->result == -ENOENT &&
1722 obj_request->img_offset < rbd_dev->parent_overlap)
1723 rbd_img_parent_read(obj_request);
1724 else if (img_request)
1725 rbd_img_obj_request_read_callback(obj_request);
1727 obj_request_done_set(obj_request);
1730 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1732 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1733 obj_request->result, obj_request->length);
1735 * There is no such thing as a successful short write. Set
1736 * it to our originally-requested length.
1738 obj_request->xferred = obj_request->length;
1739 obj_request_done_set(obj_request);
1743 * For a simple stat call there's nothing to do. We'll do more if
1744 * this is part of a write sequence for a layered image.
1746 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1748 dout("%s: obj %p\n", __func__, obj_request);
1749 obj_request_done_set(obj_request);
1752 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1753 struct ceph_msg *msg)
1755 struct rbd_obj_request *obj_request = osd_req->r_priv;
1758 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1759 rbd_assert(osd_req == obj_request->osd_req);
1760 if (obj_request_img_data_test(obj_request)) {
1761 rbd_assert(obj_request->img_request);
1762 rbd_assert(obj_request->which != BAD_WHICH);
1764 rbd_assert(obj_request->which == BAD_WHICH);
1767 if (osd_req->r_result < 0)
1768 obj_request->result = osd_req->r_result;
1770 rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1773 * We support a 64-bit length, but ultimately it has to be
1774 * passed to blk_end_request(), which takes an unsigned int.
1776 obj_request->xferred = osd_req->r_reply_op_len[0];
1777 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1779 opcode = osd_req->r_ops[0].op;
1781 case CEPH_OSD_OP_READ:
1782 rbd_osd_read_callback(obj_request);
1784 case CEPH_OSD_OP_SETALLOCHINT:
1785 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
1787 case CEPH_OSD_OP_WRITE:
1788 rbd_osd_write_callback(obj_request);
1790 case CEPH_OSD_OP_STAT:
1791 rbd_osd_stat_callback(obj_request);
1793 case CEPH_OSD_OP_CALL:
1794 case CEPH_OSD_OP_NOTIFY_ACK:
1795 case CEPH_OSD_OP_WATCH:
1796 rbd_osd_trivial_callback(obj_request);
1799 rbd_warn(NULL, "%s: unsupported op %hu",
1800 obj_request->object_name, (unsigned short) opcode);
1804 if (obj_request_done_test(obj_request))
1805 rbd_obj_request_complete(obj_request);
1808 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1810 struct rbd_img_request *img_request = obj_request->img_request;
1811 struct ceph_osd_request *osd_req = obj_request->osd_req;
1814 rbd_assert(osd_req != NULL);
1816 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1817 ceph_osdc_build_request(osd_req, obj_request->offset,
1818 NULL, snap_id, NULL);
1821 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1823 struct rbd_img_request *img_request = obj_request->img_request;
1824 struct ceph_osd_request *osd_req = obj_request->osd_req;
1825 struct ceph_snap_context *snapc;
1826 struct timespec mtime = CURRENT_TIME;
1828 rbd_assert(osd_req != NULL);
1830 snapc = img_request ? img_request->snapc : NULL;
1831 ceph_osdc_build_request(osd_req, obj_request->offset,
1832 snapc, CEPH_NOSNAP, &mtime);
1836 * Create an osd request. A read request has one osd op (read).
1837 * A write request has either one (watch) or two (hint+write) osd ops.
1838 * (All rbd data writes are prefixed with an allocation hint op, but
1839 * technically osd watch is a write request, hence this distinction.)
1841 static struct ceph_osd_request *rbd_osd_req_create(
1842 struct rbd_device *rbd_dev,
1843 enum obj_operation_type op_type,
1844 unsigned int num_ops,
1845 struct rbd_obj_request *obj_request)
1847 struct ceph_snap_context *snapc = NULL;
1848 struct ceph_osd_client *osdc;
1849 struct ceph_osd_request *osd_req;
1851 if (obj_request_img_data_test(obj_request) && op_type == OBJ_OP_WRITE) {
1852 struct rbd_img_request *img_request = obj_request->img_request;
1854 rbd_assert(img_request_write_test(img_request));
1855 snapc = img_request->snapc;
1858 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1860 /* Allocate and initialize the request, for the num_ops ops */
1862 osdc = &rbd_dev->rbd_client->client->osdc;
1863 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
1866 return NULL; /* ENOMEM */
1868 if (op_type == OBJ_OP_WRITE)
1869 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1871 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1873 osd_req->r_callback = rbd_osd_req_callback;
1874 osd_req->r_priv = obj_request;
1876 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1877 ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1883 * Create a copyup osd request based on the information in the
1884 * object request supplied. A copyup request has three osd ops,
1885 * a copyup method call, a hint op, and a write op.
1887 static struct ceph_osd_request *
1888 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1890 struct rbd_img_request *img_request;
1891 struct ceph_snap_context *snapc;
1892 struct rbd_device *rbd_dev;
1893 struct ceph_osd_client *osdc;
1894 struct ceph_osd_request *osd_req;
1896 rbd_assert(obj_request_img_data_test(obj_request));
1897 img_request = obj_request->img_request;
1898 rbd_assert(img_request);
1899 rbd_assert(img_request_write_test(img_request));
1901 /* Allocate and initialize the request, for the three ops */
1903 snapc = img_request->snapc;
1904 rbd_dev = img_request->rbd_dev;
1905 osdc = &rbd_dev->rbd_client->client->osdc;
1906 osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC);
1908 return NULL; /* ENOMEM */
1910 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1911 osd_req->r_callback = rbd_osd_req_callback;
1912 osd_req->r_priv = obj_request;
1914 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1915 ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1921 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1923 ceph_osdc_put_request(osd_req);
1926 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1928 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1929 u64 offset, u64 length,
1930 enum obj_request_type type)
1932 struct rbd_obj_request *obj_request;
1936 rbd_assert(obj_request_type_valid(type));
1938 size = strlen(object_name) + 1;
1939 name = kmalloc(size, GFP_KERNEL);
1943 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1949 obj_request->object_name = memcpy(name, object_name, size);
1950 obj_request->offset = offset;
1951 obj_request->length = length;
1952 obj_request->flags = 0;
1953 obj_request->which = BAD_WHICH;
1954 obj_request->type = type;
1955 INIT_LIST_HEAD(&obj_request->links);
1956 init_completion(&obj_request->completion);
1957 kref_init(&obj_request->kref);
1959 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1960 offset, length, (int)type, obj_request);
1965 static void rbd_obj_request_destroy(struct kref *kref)
1967 struct rbd_obj_request *obj_request;
1969 obj_request = container_of(kref, struct rbd_obj_request, kref);
1971 dout("%s: obj %p\n", __func__, obj_request);
1973 rbd_assert(obj_request->img_request == NULL);
1974 rbd_assert(obj_request->which == BAD_WHICH);
1976 if (obj_request->osd_req)
1977 rbd_osd_req_destroy(obj_request->osd_req);
1979 rbd_assert(obj_request_type_valid(obj_request->type));
1980 switch (obj_request->type) {
1981 case OBJ_REQUEST_NODATA:
1982 break; /* Nothing to do */
1983 case OBJ_REQUEST_BIO:
1984 if (obj_request->bio_list)
1985 bio_chain_put(obj_request->bio_list);
1987 case OBJ_REQUEST_PAGES:
1988 if (obj_request->pages)
1989 ceph_release_page_vector(obj_request->pages,
1990 obj_request->page_count);
1994 kfree(obj_request->object_name);
1995 obj_request->object_name = NULL;
1996 kmem_cache_free(rbd_obj_request_cache, obj_request);
1999 /* It's OK to call this for a device with no parent */
2001 static void rbd_spec_put(struct rbd_spec *spec);
2002 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2004 rbd_dev_remove_parent(rbd_dev);
2005 rbd_spec_put(rbd_dev->parent_spec);
2006 rbd_dev->parent_spec = NULL;
2007 rbd_dev->parent_overlap = 0;
2011 * Parent image reference counting is used to determine when an
2012 * image's parent fields can be safely torn down--after there are no
2013 * more in-flight requests to the parent image. When the last
2014 * reference is dropped, cleaning them up is safe.
2016 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2020 if (!rbd_dev->parent_spec)
2023 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2027 /* Last reference; clean up parent data structures */
2030 rbd_dev_unparent(rbd_dev);
2032 rbd_warn(rbd_dev, "parent reference underflow");
2036 * If an image has a non-zero parent overlap, get a reference to its
2039 * We must get the reference before checking for the overlap to
2040 * coordinate properly with zeroing the parent overlap in
2041 * rbd_dev_v2_parent_info() when an image gets flattened. We
2042 * drop it again if there is no overlap.
2044 * Returns true if the rbd device has a parent with a non-zero
2045 * overlap and a reference for it was successfully taken, or
2048 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2052 if (!rbd_dev->parent_spec)
2055 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2056 if (counter > 0 && rbd_dev->parent_overlap)
2059 /* Image was flattened, but parent is not yet torn down */
2062 rbd_warn(rbd_dev, "parent reference overflow");
2068 * Caller is responsible for filling in the list of object requests
2069 * that comprises the image request, and the Linux request pointer
2070 * (if there is one).
2072 static struct rbd_img_request *rbd_img_request_create(
2073 struct rbd_device *rbd_dev,
2074 u64 offset, u64 length,
2075 enum obj_operation_type op_type,
2076 struct ceph_snap_context *snapc)
2078 struct rbd_img_request *img_request;
2080 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2084 img_request->rq = NULL;
2085 img_request->rbd_dev = rbd_dev;
2086 img_request->offset = offset;
2087 img_request->length = length;
2088 img_request->flags = 0;
2089 if (op_type == OBJ_OP_WRITE) {
2090 img_request_write_set(img_request);
2091 img_request->snapc = snapc;
2093 img_request->snap_id = rbd_dev->spec->snap_id;
2095 if (rbd_dev_parent_get(rbd_dev))
2096 img_request_layered_set(img_request);
2097 spin_lock_init(&img_request->completion_lock);
2098 img_request->next_completion = 0;
2099 img_request->callback = NULL;
2100 img_request->result = 0;
2101 img_request->obj_request_count = 0;
2102 INIT_LIST_HEAD(&img_request->obj_requests);
2103 kref_init(&img_request->kref);
2105 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2106 obj_op_name(op_type), offset, length, img_request);
2111 static void rbd_img_request_destroy(struct kref *kref)
2113 struct rbd_img_request *img_request;
2114 struct rbd_obj_request *obj_request;
2115 struct rbd_obj_request *next_obj_request;
2117 img_request = container_of(kref, struct rbd_img_request, kref);
2119 dout("%s: img %p\n", __func__, img_request);
2121 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2122 rbd_img_obj_request_del(img_request, obj_request);
2123 rbd_assert(img_request->obj_request_count == 0);
2125 if (img_request_layered_test(img_request)) {
2126 img_request_layered_clear(img_request);
2127 rbd_dev_parent_put(img_request->rbd_dev);
2130 if (img_request_write_test(img_request))
2131 ceph_put_snap_context(img_request->snapc);
2133 kmem_cache_free(rbd_img_request_cache, img_request);
2136 static struct rbd_img_request *rbd_parent_request_create(
2137 struct rbd_obj_request *obj_request,
2138 u64 img_offset, u64 length)
2140 struct rbd_img_request *parent_request;
2141 struct rbd_device *rbd_dev;
2143 rbd_assert(obj_request->img_request);
2144 rbd_dev = obj_request->img_request->rbd_dev;
2146 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2147 length, OBJ_OP_READ, NULL);
2148 if (!parent_request)
2151 img_request_child_set(parent_request);
2152 rbd_obj_request_get(obj_request);
2153 parent_request->obj_request = obj_request;
2155 return parent_request;
2158 static void rbd_parent_request_destroy(struct kref *kref)
2160 struct rbd_img_request *parent_request;
2161 struct rbd_obj_request *orig_request;
2163 parent_request = container_of(kref, struct rbd_img_request, kref);
2164 orig_request = parent_request->obj_request;
2166 parent_request->obj_request = NULL;
2167 rbd_obj_request_put(orig_request);
2168 img_request_child_clear(parent_request);
2170 rbd_img_request_destroy(kref);
2173 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2175 struct rbd_img_request *img_request;
2176 unsigned int xferred;
2180 rbd_assert(obj_request_img_data_test(obj_request));
2181 img_request = obj_request->img_request;
2183 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2184 xferred = (unsigned int)obj_request->xferred;
2185 result = obj_request->result;
2187 struct rbd_device *rbd_dev = img_request->rbd_dev;
2188 enum obj_operation_type op_type;
2190 op_type = img_request_write_test(img_request) ? OBJ_OP_WRITE :
2193 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2194 obj_op_name(op_type), obj_request->length,
2195 obj_request->img_offset, obj_request->offset);
2196 rbd_warn(rbd_dev, " result %d xferred %x",
2198 if (!img_request->result)
2199 img_request->result = result;
2202 /* Image object requests don't own their page array */
2204 if (obj_request->type == OBJ_REQUEST_PAGES) {
2205 obj_request->pages = NULL;
2206 obj_request->page_count = 0;
2209 if (img_request_child_test(img_request)) {
2210 rbd_assert(img_request->obj_request != NULL);
2211 more = obj_request->which < img_request->obj_request_count - 1;
2213 rbd_assert(img_request->rq != NULL);
2214 more = blk_end_request(img_request->rq, result, xferred);
2220 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2222 struct rbd_img_request *img_request;
2223 u32 which = obj_request->which;
2226 rbd_assert(obj_request_img_data_test(obj_request));
2227 img_request = obj_request->img_request;
2229 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2230 rbd_assert(img_request != NULL);
2231 rbd_assert(img_request->obj_request_count > 0);
2232 rbd_assert(which != BAD_WHICH);
2233 rbd_assert(which < img_request->obj_request_count);
2235 spin_lock_irq(&img_request->completion_lock);
2236 if (which != img_request->next_completion)
2239 for_each_obj_request_from(img_request, obj_request) {
2241 rbd_assert(which < img_request->obj_request_count);
2243 if (!obj_request_done_test(obj_request))
2245 more = rbd_img_obj_end_request(obj_request);
2249 rbd_assert(more ^ (which == img_request->obj_request_count));
2250 img_request->next_completion = which;
2252 spin_unlock_irq(&img_request->completion_lock);
2253 rbd_img_request_put(img_request);
2256 rbd_img_request_complete(img_request);
2260 * Split up an image request into one or more object requests, each
2261 * to a different object. The "type" parameter indicates whether
2262 * "data_desc" is the pointer to the head of a list of bio
2263 * structures, or the base of a page array. In either case this
2264 * function assumes data_desc describes memory sufficient to hold
2265 * all data described by the image request.
2267 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2268 enum obj_request_type type,
2271 struct rbd_device *rbd_dev = img_request->rbd_dev;
2272 struct rbd_obj_request *obj_request = NULL;
2273 struct rbd_obj_request *next_obj_request;
2274 struct bio *bio_list = NULL;
2275 unsigned int bio_offset = 0;
2276 struct page **pages = NULL;
2277 enum obj_operation_type op_type;
2282 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2283 (int)type, data_desc);
2285 img_offset = img_request->offset;
2286 resid = img_request->length;
2287 rbd_assert(resid > 0);
2289 if (type == OBJ_REQUEST_BIO) {
2290 bio_list = data_desc;
2291 rbd_assert(img_offset ==
2292 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2294 rbd_assert(type == OBJ_REQUEST_PAGES);
2299 struct ceph_osd_request *osd_req;
2300 const char *object_name;
2303 unsigned int which = 0;
2305 object_name = rbd_segment_name(rbd_dev, img_offset);
2308 offset = rbd_segment_offset(rbd_dev, img_offset);
2309 length = rbd_segment_length(rbd_dev, img_offset, resid);
2310 obj_request = rbd_obj_request_create(object_name,
2311 offset, length, type);
2312 /* object request has its own copy of the object name */
2313 rbd_segment_name_free(object_name);
2318 * set obj_request->img_request before creating the
2319 * osd_request so that it gets the right snapc
2321 rbd_img_obj_request_add(img_request, obj_request);
2323 if (type == OBJ_REQUEST_BIO) {
2324 unsigned int clone_size;
2326 rbd_assert(length <= (u64)UINT_MAX);
2327 clone_size = (unsigned int)length;
2328 obj_request->bio_list =
2329 bio_chain_clone_range(&bio_list,
2333 if (!obj_request->bio_list)
2336 unsigned int page_count;
2338 obj_request->pages = pages;
2339 page_count = (u32)calc_pages_for(offset, length);
2340 obj_request->page_count = page_count;
2341 if ((offset + length) & ~PAGE_MASK)
2342 page_count--; /* more on last page */
2343 pages += page_count;
2346 if (img_request_write_test(img_request)) {
2347 op_type = OBJ_OP_WRITE;
2348 opcode = CEPH_OSD_OP_WRITE;
2350 op_type = OBJ_OP_READ;
2351 opcode = CEPH_OSD_OP_READ;
2354 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2355 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2359 obj_request->osd_req = osd_req;
2360 obj_request->callback = rbd_img_obj_callback;
2361 rbd_img_request_get(img_request);
2363 if (op_type == OBJ_OP_WRITE) {
2364 osd_req_op_alloc_hint_init(osd_req, which,
2365 rbd_obj_bytes(&rbd_dev->header),
2366 rbd_obj_bytes(&rbd_dev->header));
2370 osd_req_op_extent_init(osd_req, which, opcode, offset, length,
2372 if (type == OBJ_REQUEST_BIO)
2373 osd_req_op_extent_osd_data_bio(osd_req, which,
2374 obj_request->bio_list, length);
2376 osd_req_op_extent_osd_data_pages(osd_req, which,
2377 obj_request->pages, length,
2378 offset & ~PAGE_MASK, false, false);
2380 if (op_type == OBJ_OP_WRITE)
2381 rbd_osd_req_format_write(obj_request);
2383 rbd_osd_req_format_read(obj_request);
2385 obj_request->img_offset = img_offset;
2387 img_offset += length;
2394 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2395 rbd_img_obj_request_del(img_request, obj_request);
2401 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2403 struct rbd_img_request *img_request;
2404 struct rbd_device *rbd_dev;
2405 struct page **pages;
2408 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2409 rbd_assert(obj_request_img_data_test(obj_request));
2410 img_request = obj_request->img_request;
2411 rbd_assert(img_request);
2413 rbd_dev = img_request->rbd_dev;
2414 rbd_assert(rbd_dev);
2416 pages = obj_request->copyup_pages;
2417 rbd_assert(pages != NULL);
2418 obj_request->copyup_pages = NULL;
2419 page_count = obj_request->copyup_page_count;
2420 rbd_assert(page_count);
2421 obj_request->copyup_page_count = 0;
2422 ceph_release_page_vector(pages, page_count);
2425 * We want the transfer count to reflect the size of the
2426 * original write request. There is no such thing as a
2427 * successful short write, so if the request was successful
2428 * we can just set it to the originally-requested length.
2430 if (!obj_request->result)
2431 obj_request->xferred = obj_request->length;
2433 /* Finish up with the normal image object callback */
2435 rbd_img_obj_callback(obj_request);
2439 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2441 struct rbd_obj_request *orig_request;
2442 struct ceph_osd_request *osd_req;
2443 struct ceph_osd_client *osdc;
2444 struct rbd_device *rbd_dev;
2445 struct page **pages;
2452 rbd_assert(img_request_child_test(img_request));
2454 /* First get what we need from the image request */
2456 pages = img_request->copyup_pages;
2457 rbd_assert(pages != NULL);
2458 img_request->copyup_pages = NULL;
2459 page_count = img_request->copyup_page_count;
2460 rbd_assert(page_count);
2461 img_request->copyup_page_count = 0;
2463 orig_request = img_request->obj_request;
2464 rbd_assert(orig_request != NULL);
2465 rbd_assert(obj_request_type_valid(orig_request->type));
2466 img_result = img_request->result;
2467 parent_length = img_request->length;
2468 rbd_assert(parent_length == img_request->xferred);
2469 rbd_img_request_put(img_request);
2471 rbd_assert(orig_request->img_request);
2472 rbd_dev = orig_request->img_request->rbd_dev;
2473 rbd_assert(rbd_dev);
2476 * If the overlap has become 0 (most likely because the
2477 * image has been flattened) we need to free the pages
2478 * and re-submit the original write request.
2480 if (!rbd_dev->parent_overlap) {
2481 struct ceph_osd_client *osdc;
2483 ceph_release_page_vector(pages, page_count);
2484 osdc = &rbd_dev->rbd_client->client->osdc;
2485 img_result = rbd_obj_request_submit(osdc, orig_request);
2494 * The original osd request is of no use to use any more.
2495 * We need a new one that can hold the three ops in a copyup
2496 * request. Allocate the new copyup osd request for the
2497 * original request, and release the old one.
2499 img_result = -ENOMEM;
2500 osd_req = rbd_osd_req_create_copyup(orig_request);
2503 rbd_osd_req_destroy(orig_request->osd_req);
2504 orig_request->osd_req = osd_req;
2505 orig_request->copyup_pages = pages;
2506 orig_request->copyup_page_count = page_count;
2508 /* Initialize the copyup op */
2510 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2511 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2514 /* Then the hint op */
2516 osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header),
2517 rbd_obj_bytes(&rbd_dev->header));
2519 /* And the original write request op */
2521 offset = orig_request->offset;
2522 length = orig_request->length;
2523 osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE,
2524 offset, length, 0, 0);
2525 if (orig_request->type == OBJ_REQUEST_BIO)
2526 osd_req_op_extent_osd_data_bio(osd_req, 2,
2527 orig_request->bio_list, length);
2529 osd_req_op_extent_osd_data_pages(osd_req, 2,
2530 orig_request->pages, length,
2531 offset & ~PAGE_MASK, false, false);
2533 rbd_osd_req_format_write(orig_request);
2535 /* All set, send it off. */
2537 orig_request->callback = rbd_img_obj_copyup_callback;
2538 osdc = &rbd_dev->rbd_client->client->osdc;
2539 img_result = rbd_obj_request_submit(osdc, orig_request);
2543 /* Record the error code and complete the request */
2545 orig_request->result = img_result;
2546 orig_request->xferred = 0;
2547 obj_request_done_set(orig_request);
2548 rbd_obj_request_complete(orig_request);
2552 * Read from the parent image the range of data that covers the
2553 * entire target of the given object request. This is used for
2554 * satisfying a layered image write request when the target of an
2555 * object request from the image request does not exist.
2557 * A page array big enough to hold the returned data is allocated
2558 * and supplied to rbd_img_request_fill() as the "data descriptor."
2559 * When the read completes, this page array will be transferred to
2560 * the original object request for the copyup operation.
2562 * If an error occurs, record it as the result of the original
2563 * object request and mark it done so it gets completed.
2565 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2567 struct rbd_img_request *img_request = NULL;
2568 struct rbd_img_request *parent_request = NULL;
2569 struct rbd_device *rbd_dev;
2572 struct page **pages = NULL;
2576 rbd_assert(obj_request_img_data_test(obj_request));
2577 rbd_assert(obj_request_type_valid(obj_request->type));
2579 img_request = obj_request->img_request;
2580 rbd_assert(img_request != NULL);
2581 rbd_dev = img_request->rbd_dev;
2582 rbd_assert(rbd_dev->parent != NULL);
2585 * Determine the byte range covered by the object in the
2586 * child image to which the original request was to be sent.
2588 img_offset = obj_request->img_offset - obj_request->offset;
2589 length = (u64)1 << rbd_dev->header.obj_order;
2592 * There is no defined parent data beyond the parent
2593 * overlap, so limit what we read at that boundary if
2596 if (img_offset + length > rbd_dev->parent_overlap) {
2597 rbd_assert(img_offset < rbd_dev->parent_overlap);
2598 length = rbd_dev->parent_overlap - img_offset;
2602 * Allocate a page array big enough to receive the data read
2605 page_count = (u32)calc_pages_for(0, length);
2606 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2607 if (IS_ERR(pages)) {
2608 result = PTR_ERR(pages);
2614 parent_request = rbd_parent_request_create(obj_request,
2615 img_offset, length);
2616 if (!parent_request)
2619 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2622 parent_request->copyup_pages = pages;
2623 parent_request->copyup_page_count = page_count;
2625 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2626 result = rbd_img_request_submit(parent_request);
2630 parent_request->copyup_pages = NULL;
2631 parent_request->copyup_page_count = 0;
2632 parent_request->obj_request = NULL;
2633 rbd_obj_request_put(obj_request);
2636 ceph_release_page_vector(pages, page_count);
2638 rbd_img_request_put(parent_request);
2639 obj_request->result = result;
2640 obj_request->xferred = 0;
2641 obj_request_done_set(obj_request);
2646 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2648 struct rbd_obj_request *orig_request;
2649 struct rbd_device *rbd_dev;
2652 rbd_assert(!obj_request_img_data_test(obj_request));
2655 * All we need from the object request is the original
2656 * request and the result of the STAT op. Grab those, then
2657 * we're done with the request.
2659 orig_request = obj_request->obj_request;
2660 obj_request->obj_request = NULL;
2661 rbd_obj_request_put(orig_request);
2662 rbd_assert(orig_request);
2663 rbd_assert(orig_request->img_request);
2665 result = obj_request->result;
2666 obj_request->result = 0;
2668 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2669 obj_request, orig_request, result,
2670 obj_request->xferred, obj_request->length);
2671 rbd_obj_request_put(obj_request);
2674 * If the overlap has become 0 (most likely because the
2675 * image has been flattened) we need to free the pages
2676 * and re-submit the original write request.
2678 rbd_dev = orig_request->img_request->rbd_dev;
2679 if (!rbd_dev->parent_overlap) {
2680 struct ceph_osd_client *osdc;
2682 osdc = &rbd_dev->rbd_client->client->osdc;
2683 result = rbd_obj_request_submit(osdc, orig_request);
2689 * Our only purpose here is to determine whether the object
2690 * exists, and we don't want to treat the non-existence as
2691 * an error. If something else comes back, transfer the
2692 * error to the original request and complete it now.
2695 obj_request_existence_set(orig_request, true);
2696 } else if (result == -ENOENT) {
2697 obj_request_existence_set(orig_request, false);
2698 } else if (result) {
2699 orig_request->result = result;
2704 * Resubmit the original request now that we have recorded
2705 * whether the target object exists.
2707 orig_request->result = rbd_img_obj_request_submit(orig_request);
2709 if (orig_request->result)
2710 rbd_obj_request_complete(orig_request);
2713 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2715 struct rbd_obj_request *stat_request;
2716 struct rbd_device *rbd_dev;
2717 struct ceph_osd_client *osdc;
2718 struct page **pages = NULL;
2724 * The response data for a STAT call consists of:
2731 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2732 page_count = (u32)calc_pages_for(0, size);
2733 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2735 return PTR_ERR(pages);
2738 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2743 rbd_obj_request_get(obj_request);
2744 stat_request->obj_request = obj_request;
2745 stat_request->pages = pages;
2746 stat_request->page_count = page_count;
2748 rbd_assert(obj_request->img_request);
2749 rbd_dev = obj_request->img_request->rbd_dev;
2750 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2752 if (!stat_request->osd_req)
2754 stat_request->callback = rbd_img_obj_exists_callback;
2756 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2757 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2759 rbd_osd_req_format_read(stat_request);
2761 osdc = &rbd_dev->rbd_client->client->osdc;
2762 ret = rbd_obj_request_submit(osdc, stat_request);
2765 rbd_obj_request_put(obj_request);
2770 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2772 struct rbd_img_request *img_request;
2773 struct rbd_device *rbd_dev;
2775 rbd_assert(obj_request_img_data_test(obj_request));
2777 img_request = obj_request->img_request;
2778 rbd_assert(img_request);
2779 rbd_dev = img_request->rbd_dev;
2782 if (!img_request_write_test(img_request))
2785 /* Non-layered writes */
2786 if (!img_request_layered_test(img_request))
2790 * Layered writes outside of the parent overlap range don't
2791 * share any data with the parent.
2793 if (!obj_request_overlaps_parent(obj_request))
2797 * Entire-object layered writes - we will overwrite whatever
2798 * parent data there is anyway.
2800 if (!obj_request->offset &&
2801 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2805 * If the object is known to already exist, its parent data has
2806 * already been copied.
2808 if (obj_request_known_test(obj_request) &&
2809 obj_request_exists_test(obj_request))
2815 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2817 if (img_obj_request_simple(obj_request)) {
2818 struct rbd_device *rbd_dev;
2819 struct ceph_osd_client *osdc;
2821 rbd_dev = obj_request->img_request->rbd_dev;
2822 osdc = &rbd_dev->rbd_client->client->osdc;
2824 return rbd_obj_request_submit(osdc, obj_request);
2828 * It's a layered write. The target object might exist but
2829 * we may not know that yet. If we know it doesn't exist,
2830 * start by reading the data for the full target object from
2831 * the parent so we can use it for a copyup to the target.
2833 if (obj_request_known_test(obj_request))
2834 return rbd_img_obj_parent_read_full(obj_request);
2836 /* We don't know whether the target exists. Go find out. */
2838 return rbd_img_obj_exists_submit(obj_request);
2841 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2843 struct rbd_obj_request *obj_request;
2844 struct rbd_obj_request *next_obj_request;
2846 dout("%s: img %p\n", __func__, img_request);
2847 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2850 ret = rbd_img_obj_request_submit(obj_request);
2858 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2860 struct rbd_obj_request *obj_request;
2861 struct rbd_device *rbd_dev;
2866 rbd_assert(img_request_child_test(img_request));
2868 /* First get what we need from the image request and release it */
2870 obj_request = img_request->obj_request;
2871 img_xferred = img_request->xferred;
2872 img_result = img_request->result;
2873 rbd_img_request_put(img_request);
2876 * If the overlap has become 0 (most likely because the
2877 * image has been flattened) we need to re-submit the
2880 rbd_assert(obj_request);
2881 rbd_assert(obj_request->img_request);
2882 rbd_dev = obj_request->img_request->rbd_dev;
2883 if (!rbd_dev->parent_overlap) {
2884 struct ceph_osd_client *osdc;
2886 osdc = &rbd_dev->rbd_client->client->osdc;
2887 img_result = rbd_obj_request_submit(osdc, obj_request);
2892 obj_request->result = img_result;
2893 if (obj_request->result)
2897 * We need to zero anything beyond the parent overlap
2898 * boundary. Since rbd_img_obj_request_read_callback()
2899 * will zero anything beyond the end of a short read, an
2900 * easy way to do this is to pretend the data from the
2901 * parent came up short--ending at the overlap boundary.
2903 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2904 obj_end = obj_request->img_offset + obj_request->length;
2905 if (obj_end > rbd_dev->parent_overlap) {
2908 if (obj_request->img_offset < rbd_dev->parent_overlap)
2909 xferred = rbd_dev->parent_overlap -
2910 obj_request->img_offset;
2912 obj_request->xferred = min(img_xferred, xferred);
2914 obj_request->xferred = img_xferred;
2917 rbd_img_obj_request_read_callback(obj_request);
2918 rbd_obj_request_complete(obj_request);
2921 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2923 struct rbd_img_request *img_request;
2926 rbd_assert(obj_request_img_data_test(obj_request));
2927 rbd_assert(obj_request->img_request != NULL);
2928 rbd_assert(obj_request->result == (s32) -ENOENT);
2929 rbd_assert(obj_request_type_valid(obj_request->type));
2931 /* rbd_read_finish(obj_request, obj_request->length); */
2932 img_request = rbd_parent_request_create(obj_request,
2933 obj_request->img_offset,
2934 obj_request->length);
2939 if (obj_request->type == OBJ_REQUEST_BIO)
2940 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2941 obj_request->bio_list);
2943 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2944 obj_request->pages);
2948 img_request->callback = rbd_img_parent_read_callback;
2949 result = rbd_img_request_submit(img_request);
2956 rbd_img_request_put(img_request);
2957 obj_request->result = result;
2958 obj_request->xferred = 0;
2959 obj_request_done_set(obj_request);
2962 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2964 struct rbd_obj_request *obj_request;
2965 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2968 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2969 OBJ_REQUEST_NODATA);
2974 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2976 if (!obj_request->osd_req)
2979 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2981 rbd_osd_req_format_read(obj_request);
2983 ret = rbd_obj_request_submit(osdc, obj_request);
2986 ret = rbd_obj_request_wait(obj_request);
2988 rbd_obj_request_put(obj_request);
2993 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2995 struct rbd_device *rbd_dev = (struct rbd_device *)data;
3001 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
3002 rbd_dev->header_name, (unsigned long long)notify_id,
3003 (unsigned int)opcode);
3006 * Until adequate refresh error handling is in place, there is
3007 * not much we can do here, except warn.
3009 * See http://tracker.ceph.com/issues/5040
3011 ret = rbd_dev_refresh(rbd_dev);
3013 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3015 ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
3017 rbd_warn(rbd_dev, "notify_ack ret %d", ret);
3021 * Send a (un)watch request and wait for the ack. Return a request
3022 * with a ref held on success or error.
3024 static struct rbd_obj_request *rbd_obj_watch_request_helper(
3025 struct rbd_device *rbd_dev,
3028 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3029 struct rbd_obj_request *obj_request;
3032 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3033 OBJ_REQUEST_NODATA);
3035 return ERR_PTR(-ENOMEM);
3037 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
3039 if (!obj_request->osd_req) {
3044 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
3045 rbd_dev->watch_event->cookie, 0, watch);
3046 rbd_osd_req_format_write(obj_request);
3049 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
3051 ret = rbd_obj_request_submit(osdc, obj_request);
3055 ret = rbd_obj_request_wait(obj_request);
3059 ret = obj_request->result;
3062 rbd_obj_request_end(obj_request);
3069 rbd_obj_request_put(obj_request);
3070 return ERR_PTR(ret);
3074 * Initiate a watch request, synchronously.
3076 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3078 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3079 struct rbd_obj_request *obj_request;
3082 rbd_assert(!rbd_dev->watch_event);
3083 rbd_assert(!rbd_dev->watch_request);
3085 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
3086 &rbd_dev->watch_event);
3090 obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
3091 if (IS_ERR(obj_request)) {
3092 ceph_osdc_cancel_event(rbd_dev->watch_event);
3093 rbd_dev->watch_event = NULL;
3094 return PTR_ERR(obj_request);
3098 * A watch request is set to linger, so the underlying osd
3099 * request won't go away until we unregister it. We retain
3100 * a pointer to the object request during that time (in
3101 * rbd_dev->watch_request), so we'll keep a reference to it.
3102 * We'll drop that reference after we've unregistered it in
3103 * rbd_dev_header_unwatch_sync().
3105 rbd_dev->watch_request = obj_request;
3111 * Tear down a watch request, synchronously.
3113 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3115 struct rbd_obj_request *obj_request;
3117 rbd_assert(rbd_dev->watch_event);
3118 rbd_assert(rbd_dev->watch_request);
3120 rbd_obj_request_end(rbd_dev->watch_request);
3121 rbd_obj_request_put(rbd_dev->watch_request);
3122 rbd_dev->watch_request = NULL;
3124 obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
3125 if (!IS_ERR(obj_request))
3126 rbd_obj_request_put(obj_request);
3128 rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
3129 PTR_ERR(obj_request));
3131 ceph_osdc_cancel_event(rbd_dev->watch_event);
3132 rbd_dev->watch_event = NULL;
3136 * Synchronous osd object method call. Returns the number of bytes
3137 * returned in the outbound buffer, or a negative error code.
3139 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3140 const char *object_name,
3141 const char *class_name,
3142 const char *method_name,
3143 const void *outbound,
3144 size_t outbound_size,
3146 size_t inbound_size)
3148 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3149 struct rbd_obj_request *obj_request;
3150 struct page **pages;
3155 * Method calls are ultimately read operations. The result
3156 * should placed into the inbound buffer provided. They
3157 * also supply outbound data--parameters for the object
3158 * method. Currently if this is present it will be a
3161 page_count = (u32)calc_pages_for(0, inbound_size);
3162 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3164 return PTR_ERR(pages);
3167 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
3172 obj_request->pages = pages;
3173 obj_request->page_count = page_count;
3175 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3177 if (!obj_request->osd_req)
3180 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
3181 class_name, method_name);
3182 if (outbound_size) {
3183 struct ceph_pagelist *pagelist;
3185 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3189 ceph_pagelist_init(pagelist);
3190 ceph_pagelist_append(pagelist, outbound, outbound_size);
3191 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3194 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3195 obj_request->pages, inbound_size,
3197 rbd_osd_req_format_read(obj_request);
3199 ret = rbd_obj_request_submit(osdc, obj_request);
3202 ret = rbd_obj_request_wait(obj_request);
3206 ret = obj_request->result;
3210 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3211 ret = (int)obj_request->xferred;
3212 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3215 rbd_obj_request_put(obj_request);
3217 ceph_release_page_vector(pages, page_count);
3222 static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3224 struct rbd_img_request *img_request;
3225 struct ceph_snap_context *snapc = NULL;
3226 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3227 u64 length = blk_rq_bytes(rq);
3228 enum obj_operation_type op_type;
3232 if (rq->cmd_flags & REQ_WRITE)
3233 op_type = OBJ_OP_WRITE;
3235 op_type = OBJ_OP_READ;
3237 /* Ignore/skip any zero-length requests */
3240 dout("%s: zero-length request\n", __func__);
3245 /* Only reads are allowed to a read-only device */
3247 if (op_type != OBJ_OP_READ) {
3248 if (rbd_dev->mapping.read_only) {
3252 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3256 * Quit early if the mapped snapshot no longer exists. It's
3257 * still possible the snapshot will have disappeared by the
3258 * time our request arrives at the osd, but there's no sense in
3259 * sending it if we already know.
3261 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3262 dout("request for non-existent snapshot");
3263 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3268 if (offset && length > U64_MAX - offset + 1) {
3269 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3272 goto err_rq; /* Shouldn't happen */
3275 down_read(&rbd_dev->header_rwsem);
3276 mapping_size = rbd_dev->mapping.size;
3277 if (op_type != OBJ_OP_READ) {
3278 snapc = rbd_dev->header.snapc;
3279 ceph_get_snap_context(snapc);
3281 up_read(&rbd_dev->header_rwsem);
3283 if (offset + length > mapping_size) {
3284 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3285 length, mapping_size);
3290 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
3296 img_request->rq = rq;
3298 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
3300 goto err_img_request;
3302 result = rbd_img_request_submit(img_request);
3304 goto err_img_request;
3309 rbd_img_request_put(img_request);
3312 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3313 obj_op_name(op_type), length, offset, result);
3315 ceph_put_snap_context(snapc);
3316 blk_end_request_all(rq, result);
3319 static void rbd_request_workfn(struct work_struct *work)
3321 struct rbd_device *rbd_dev =
3322 container_of(work, struct rbd_device, rq_work);
3323 struct request *rq, *next;
3324 LIST_HEAD(requests);
3326 spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3327 list_splice_init(&rbd_dev->rq_queue, &requests);
3328 spin_unlock_irq(&rbd_dev->lock);
3330 list_for_each_entry_safe(rq, next, &requests, queuelist) {
3331 list_del_init(&rq->queuelist);
3332 rbd_handle_request(rbd_dev, rq);
3337 * Called with q->queue_lock held and interrupts disabled, possibly on
3338 * the way to schedule(). Do not sleep here!
3340 static void rbd_request_fn(struct request_queue *q)
3342 struct rbd_device *rbd_dev = q->queuedata;
3346 rbd_assert(rbd_dev);
3348 while ((rq = blk_fetch_request(q))) {
3349 /* Ignore any non-FS requests that filter through. */
3350 if (rq->cmd_type != REQ_TYPE_FS) {
3351 dout("%s: non-fs request type %d\n", __func__,
3352 (int) rq->cmd_type);
3353 __blk_end_request_all(rq, 0);
3357 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3362 queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
3366 * a queue callback. Makes sure that we don't create a bio that spans across
3367 * multiple osd objects. One exception would be with a single page bios,
3368 * which we handle later at bio_chain_clone_range()
3370 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3371 struct bio_vec *bvec)
3373 struct rbd_device *rbd_dev = q->queuedata;
3374 sector_t sector_offset;
3375 sector_t sectors_per_obj;
3376 sector_t obj_sector_offset;
3380 * Find how far into its rbd object the partition-relative
3381 * bio start sector is to offset relative to the enclosing
3384 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3385 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3386 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3389 * Compute the number of bytes from that offset to the end
3390 * of the object. Account for what's already used by the bio.
3392 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3393 if (ret > bmd->bi_size)
3394 ret -= bmd->bi_size;
3399 * Don't send back more than was asked for. And if the bio
3400 * was empty, let the whole thing through because: "Note
3401 * that a block device *must* allow a single page to be
3402 * added to an empty bio."
3404 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3405 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3406 ret = (int) bvec->bv_len;
3411 static void rbd_free_disk(struct rbd_device *rbd_dev)
3413 struct gendisk *disk = rbd_dev->disk;
3418 rbd_dev->disk = NULL;
3419 if (disk->flags & GENHD_FL_UP) {
3422 blk_cleanup_queue(disk->queue);
3427 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3428 const char *object_name,
3429 u64 offset, u64 length, void *buf)
3432 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3433 struct rbd_obj_request *obj_request;
3434 struct page **pages = NULL;
3439 page_count = (u32) calc_pages_for(offset, length);
3440 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3442 ret = PTR_ERR(pages);
3445 obj_request = rbd_obj_request_create(object_name, offset, length,
3450 obj_request->pages = pages;
3451 obj_request->page_count = page_count;
3453 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3455 if (!obj_request->osd_req)
3458 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3459 offset, length, 0, 0);
3460 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3462 obj_request->length,
3463 obj_request->offset & ~PAGE_MASK,
3465 rbd_osd_req_format_read(obj_request);
3467 ret = rbd_obj_request_submit(osdc, obj_request);
3470 ret = rbd_obj_request_wait(obj_request);
3474 ret = obj_request->result;
3478 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3479 size = (size_t) obj_request->xferred;
3480 ceph_copy_from_page_vector(pages, buf, 0, size);
3481 rbd_assert(size <= (size_t)INT_MAX);
3485 rbd_obj_request_put(obj_request);
3487 ceph_release_page_vector(pages, page_count);
3493 * Read the complete header for the given rbd device. On successful
3494 * return, the rbd_dev->header field will contain up-to-date
3495 * information about the image.
3497 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3499 struct rbd_image_header_ondisk *ondisk = NULL;
3506 * The complete header will include an array of its 64-bit
3507 * snapshot ids, followed by the names of those snapshots as
3508 * a contiguous block of NUL-terminated strings. Note that
3509 * the number of snapshots could change by the time we read
3510 * it in, in which case we re-read it.
3517 size = sizeof (*ondisk);
3518 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3520 ondisk = kmalloc(size, GFP_KERNEL);
3524 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3528 if ((size_t)ret < size) {
3530 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3534 if (!rbd_dev_ondisk_valid(ondisk)) {
3536 rbd_warn(rbd_dev, "invalid header");
3540 names_size = le64_to_cpu(ondisk->snap_names_len);
3541 want_count = snap_count;
3542 snap_count = le32_to_cpu(ondisk->snap_count);
3543 } while (snap_count != want_count);
3545 ret = rbd_header_from_disk(rbd_dev, ondisk);
3553 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3554 * has disappeared from the (just updated) snapshot context.
3556 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3560 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3563 snap_id = rbd_dev->spec->snap_id;
3564 if (snap_id == CEPH_NOSNAP)
3567 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3568 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3571 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3577 * Don't hold the lock while doing disk operations,
3578 * or lock ordering will conflict with the bdev mutex via:
3579 * rbd_add() -> blkdev_get() -> rbd_open()
3581 spin_lock_irq(&rbd_dev->lock);
3582 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3583 spin_unlock_irq(&rbd_dev->lock);
3585 * If the device is being removed, rbd_dev->disk has
3586 * been destroyed, so don't try to update its size
3589 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3590 dout("setting size to %llu sectors", (unsigned long long)size);
3591 set_capacity(rbd_dev->disk, size);
3592 revalidate_disk(rbd_dev->disk);
3596 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3601 down_write(&rbd_dev->header_rwsem);
3602 mapping_size = rbd_dev->mapping.size;
3604 ret = rbd_dev_header_info(rbd_dev);
3609 * If there is a parent, see if it has disappeared due to the
3610 * mapped image getting flattened.
3612 if (rbd_dev->parent) {
3613 ret = rbd_dev_v2_parent_info(rbd_dev);
3618 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3619 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3620 rbd_dev->mapping.size = rbd_dev->header.image_size;
3622 /* validate mapped snapshot's EXISTS flag */
3623 rbd_exists_validate(rbd_dev);
3626 up_write(&rbd_dev->header_rwsem);
3628 if (mapping_size != rbd_dev->mapping.size)
3629 rbd_dev_update_size(rbd_dev);
3634 static int rbd_init_disk(struct rbd_device *rbd_dev)
3636 struct gendisk *disk;
3637 struct request_queue *q;
3640 /* create gendisk info */
3641 disk = alloc_disk(single_major ?
3642 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3643 RBD_MINORS_PER_MAJOR);
3647 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3649 disk->major = rbd_dev->major;
3650 disk->first_minor = rbd_dev->minor;
3652 disk->flags |= GENHD_FL_EXT_DEVT;
3653 disk->fops = &rbd_bd_ops;
3654 disk->private_data = rbd_dev;
3656 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3660 /* We use the default size, but let's be explicit about it. */
3661 blk_queue_physical_block_size(q, SECTOR_SIZE);
3663 /* set io sizes to object size */
3664 segment_size = rbd_obj_bytes(&rbd_dev->header);
3665 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3666 blk_queue_max_segment_size(q, segment_size);
3667 blk_queue_io_min(q, segment_size);
3668 blk_queue_io_opt(q, segment_size);
3670 blk_queue_merge_bvec(q, rbd_merge_bvec);
3673 q->queuedata = rbd_dev;
3675 rbd_dev->disk = disk;
3688 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3690 return container_of(dev, struct rbd_device, dev);
3693 static ssize_t rbd_size_show(struct device *dev,
3694 struct device_attribute *attr, char *buf)
3696 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3698 return sprintf(buf, "%llu\n",
3699 (unsigned long long)rbd_dev->mapping.size);
3703 * Note this shows the features for whatever's mapped, which is not
3704 * necessarily the base image.
3706 static ssize_t rbd_features_show(struct device *dev,
3707 struct device_attribute *attr, char *buf)
3709 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3711 return sprintf(buf, "0x%016llx\n",
3712 (unsigned long long)rbd_dev->mapping.features);
3715 static ssize_t rbd_major_show(struct device *dev,
3716 struct device_attribute *attr, char *buf)
3718 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3721 return sprintf(buf, "%d\n", rbd_dev->major);
3723 return sprintf(buf, "(none)\n");
3726 static ssize_t rbd_minor_show(struct device *dev,
3727 struct device_attribute *attr, char *buf)
3729 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3731 return sprintf(buf, "%d\n", rbd_dev->minor);
3734 static ssize_t rbd_client_id_show(struct device *dev,
3735 struct device_attribute *attr, char *buf)
3737 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3739 return sprintf(buf, "client%lld\n",
3740 ceph_client_id(rbd_dev->rbd_client->client));
3743 static ssize_t rbd_pool_show(struct device *dev,
3744 struct device_attribute *attr, char *buf)
3746 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3748 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3751 static ssize_t rbd_pool_id_show(struct device *dev,
3752 struct device_attribute *attr, char *buf)
3754 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3756 return sprintf(buf, "%llu\n",
3757 (unsigned long long) rbd_dev->spec->pool_id);
3760 static ssize_t rbd_name_show(struct device *dev,
3761 struct device_attribute *attr, char *buf)
3763 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3765 if (rbd_dev->spec->image_name)
3766 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3768 return sprintf(buf, "(unknown)\n");
3771 static ssize_t rbd_image_id_show(struct device *dev,
3772 struct device_attribute *attr, char *buf)
3774 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3776 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3780 * Shows the name of the currently-mapped snapshot (or
3781 * RBD_SNAP_HEAD_NAME for the base image).
3783 static ssize_t rbd_snap_show(struct device *dev,
3784 struct device_attribute *attr,
3787 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3789 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3793 * For a v2 image, shows the chain of parent images, separated by empty
3794 * lines. For v1 images or if there is no parent, shows "(no parent
3797 static ssize_t rbd_parent_show(struct device *dev,
3798 struct device_attribute *attr,
3801 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3804 if (!rbd_dev->parent)
3805 return sprintf(buf, "(no parent image)\n");
3807 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3808 struct rbd_spec *spec = rbd_dev->parent_spec;
3810 count += sprintf(&buf[count], "%s"
3811 "pool_id %llu\npool_name %s\n"
3812 "image_id %s\nimage_name %s\n"
3813 "snap_id %llu\nsnap_name %s\n"
3815 !count ? "" : "\n", /* first? */
3816 spec->pool_id, spec->pool_name,
3817 spec->image_id, spec->image_name ?: "(unknown)",
3818 spec->snap_id, spec->snap_name,
3819 rbd_dev->parent_overlap);
3825 static ssize_t rbd_image_refresh(struct device *dev,
3826 struct device_attribute *attr,
3830 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3833 ret = rbd_dev_refresh(rbd_dev);
3840 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3841 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3842 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3843 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3844 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3845 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3846 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3847 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3848 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3849 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3850 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3851 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3853 static struct attribute *rbd_attrs[] = {
3854 &dev_attr_size.attr,
3855 &dev_attr_features.attr,
3856 &dev_attr_major.attr,
3857 &dev_attr_minor.attr,
3858 &dev_attr_client_id.attr,
3859 &dev_attr_pool.attr,
3860 &dev_attr_pool_id.attr,
3861 &dev_attr_name.attr,
3862 &dev_attr_image_id.attr,
3863 &dev_attr_current_snap.attr,
3864 &dev_attr_parent.attr,
3865 &dev_attr_refresh.attr,
3869 static struct attribute_group rbd_attr_group = {
3873 static const struct attribute_group *rbd_attr_groups[] = {
3878 static void rbd_sysfs_dev_release(struct device *dev)
3882 static struct device_type rbd_device_type = {
3884 .groups = rbd_attr_groups,
3885 .release = rbd_sysfs_dev_release,
3888 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3890 kref_get(&spec->kref);
3895 static void rbd_spec_free(struct kref *kref);
3896 static void rbd_spec_put(struct rbd_spec *spec)
3899 kref_put(&spec->kref, rbd_spec_free);
3902 static struct rbd_spec *rbd_spec_alloc(void)
3904 struct rbd_spec *spec;
3906 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3910 spec->pool_id = CEPH_NOPOOL;
3911 spec->snap_id = CEPH_NOSNAP;
3912 kref_init(&spec->kref);
3917 static void rbd_spec_free(struct kref *kref)
3919 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3921 kfree(spec->pool_name);
3922 kfree(spec->image_id);
3923 kfree(spec->image_name);
3924 kfree(spec->snap_name);
3928 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3929 struct rbd_spec *spec)
3931 struct rbd_device *rbd_dev;
3933 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3937 spin_lock_init(&rbd_dev->lock);
3938 INIT_LIST_HEAD(&rbd_dev->rq_queue);
3939 INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
3941 atomic_set(&rbd_dev->parent_ref, 0);
3942 INIT_LIST_HEAD(&rbd_dev->node);
3943 init_rwsem(&rbd_dev->header_rwsem);
3945 rbd_dev->spec = spec;
3946 rbd_dev->rbd_client = rbdc;
3948 /* Initialize the layout used for all rbd requests */
3950 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3951 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3952 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3953 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3958 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3960 rbd_put_client(rbd_dev->rbd_client);
3961 rbd_spec_put(rbd_dev->spec);
3966 * Get the size and object order for an image snapshot, or if
3967 * snap_id is CEPH_NOSNAP, gets this information for the base
3970 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3971 u8 *order, u64 *snap_size)
3973 __le64 snapid = cpu_to_le64(snap_id);
3978 } __attribute__ ((packed)) size_buf = { 0 };
3980 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3982 &snapid, sizeof (snapid),
3983 &size_buf, sizeof (size_buf));
3984 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3987 if (ret < sizeof (size_buf))
3991 *order = size_buf.order;
3992 dout(" order %u", (unsigned int)*order);
3994 *snap_size = le64_to_cpu(size_buf.size);
3996 dout(" snap_id 0x%016llx snap_size = %llu\n",
3997 (unsigned long long)snap_id,
3998 (unsigned long long)*snap_size);
4003 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4005 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4006 &rbd_dev->header.obj_order,
4007 &rbd_dev->header.image_size);
4010 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4016 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4020 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4021 "rbd", "get_object_prefix", NULL, 0,
4022 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4023 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4028 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4029 p + ret, NULL, GFP_NOIO);
4032 if (IS_ERR(rbd_dev->header.object_prefix)) {
4033 ret = PTR_ERR(rbd_dev->header.object_prefix);
4034 rbd_dev->header.object_prefix = NULL;
4036 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4044 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4047 __le64 snapid = cpu_to_le64(snap_id);
4051 } __attribute__ ((packed)) features_buf = { 0 };
4055 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4056 "rbd", "get_features",
4057 &snapid, sizeof (snapid),
4058 &features_buf, sizeof (features_buf));
4059 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4062 if (ret < sizeof (features_buf))
4065 incompat = le64_to_cpu(features_buf.incompat);
4066 if (incompat & ~RBD_FEATURES_SUPPORTED)
4069 *snap_features = le64_to_cpu(features_buf.features);
4071 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4072 (unsigned long long)snap_id,
4073 (unsigned long long)*snap_features,
4074 (unsigned long long)le64_to_cpu(features_buf.incompat));
4079 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4081 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4082 &rbd_dev->header.features);
4085 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4087 struct rbd_spec *parent_spec;
4089 void *reply_buf = NULL;
4099 parent_spec = rbd_spec_alloc();
4103 size = sizeof (__le64) + /* pool_id */
4104 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4105 sizeof (__le64) + /* snap_id */
4106 sizeof (__le64); /* overlap */
4107 reply_buf = kmalloc(size, GFP_KERNEL);
4113 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4114 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4115 "rbd", "get_parent",
4116 &snapid, sizeof (snapid),
4118 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4123 end = reply_buf + ret;
4125 ceph_decode_64_safe(&p, end, pool_id, out_err);
4126 if (pool_id == CEPH_NOPOOL) {
4128 * Either the parent never existed, or we have
4129 * record of it but the image got flattened so it no
4130 * longer has a parent. When the parent of a
4131 * layered image disappears we immediately set the
4132 * overlap to 0. The effect of this is that all new
4133 * requests will be treated as if the image had no
4136 if (rbd_dev->parent_overlap) {
4137 rbd_dev->parent_overlap = 0;
4139 rbd_dev_parent_put(rbd_dev);
4140 pr_info("%s: clone image has been flattened\n",
4141 rbd_dev->disk->disk_name);
4144 goto out; /* No parent? No problem. */
4147 /* The ceph file layout needs to fit pool id in 32 bits */
4150 if (pool_id > (u64)U32_MAX) {
4151 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4152 (unsigned long long)pool_id, U32_MAX);
4156 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4157 if (IS_ERR(image_id)) {
4158 ret = PTR_ERR(image_id);
4161 ceph_decode_64_safe(&p, end, snap_id, out_err);
4162 ceph_decode_64_safe(&p, end, overlap, out_err);
4165 * The parent won't change (except when the clone is
4166 * flattened, already handled that). So we only need to
4167 * record the parent spec we have not already done so.
4169 if (!rbd_dev->parent_spec) {
4170 parent_spec->pool_id = pool_id;
4171 parent_spec->image_id = image_id;
4172 parent_spec->snap_id = snap_id;
4173 rbd_dev->parent_spec = parent_spec;
4174 parent_spec = NULL; /* rbd_dev now owns this */
4180 * We always update the parent overlap. If it's zero we
4181 * treat it specially.
4183 rbd_dev->parent_overlap = overlap;
4187 /* A null parent_spec indicates it's the initial probe */
4191 * The overlap has become zero, so the clone
4192 * must have been resized down to 0 at some
4193 * point. Treat this the same as a flatten.
4195 rbd_dev_parent_put(rbd_dev);
4196 pr_info("%s: clone image now standalone\n",
4197 rbd_dev->disk->disk_name);
4200 * For the initial probe, if we find the
4201 * overlap is zero we just pretend there was
4204 rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4211 rbd_spec_put(parent_spec);
4216 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4220 __le64 stripe_count;
4221 } __attribute__ ((packed)) striping_info_buf = { 0 };
4222 size_t size = sizeof (striping_info_buf);
4229 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4230 "rbd", "get_stripe_unit_count", NULL, 0,
4231 (char *)&striping_info_buf, size);
4232 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4239 * We don't actually support the "fancy striping" feature
4240 * (STRIPINGV2) yet, but if the striping sizes are the
4241 * defaults the behavior is the same as before. So find
4242 * out, and only fail if the image has non-default values.
4245 obj_size = (u64)1 << rbd_dev->header.obj_order;
4246 p = &striping_info_buf;
4247 stripe_unit = ceph_decode_64(&p);
4248 if (stripe_unit != obj_size) {
4249 rbd_warn(rbd_dev, "unsupported stripe unit "
4250 "(got %llu want %llu)",
4251 stripe_unit, obj_size);
4254 stripe_count = ceph_decode_64(&p);
4255 if (stripe_count != 1) {
4256 rbd_warn(rbd_dev, "unsupported stripe count "
4257 "(got %llu want 1)", stripe_count);
4260 rbd_dev->header.stripe_unit = stripe_unit;
4261 rbd_dev->header.stripe_count = stripe_count;
4266 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4268 size_t image_id_size;
4273 void *reply_buf = NULL;
4275 char *image_name = NULL;
4278 rbd_assert(!rbd_dev->spec->image_name);
4280 len = strlen(rbd_dev->spec->image_id);
4281 image_id_size = sizeof (__le32) + len;
4282 image_id = kmalloc(image_id_size, GFP_KERNEL);
4287 end = image_id + image_id_size;
4288 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4290 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4291 reply_buf = kmalloc(size, GFP_KERNEL);
4295 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4296 "rbd", "dir_get_name",
4297 image_id, image_id_size,
4302 end = reply_buf + ret;
4304 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4305 if (IS_ERR(image_name))
4308 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4316 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4318 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4319 const char *snap_name;
4322 /* Skip over names until we find the one we are looking for */
4324 snap_name = rbd_dev->header.snap_names;
4325 while (which < snapc->num_snaps) {
4326 if (!strcmp(name, snap_name))
4327 return snapc->snaps[which];
4328 snap_name += strlen(snap_name) + 1;
4334 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4336 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4341 for (which = 0; !found && which < snapc->num_snaps; which++) {
4342 const char *snap_name;
4344 snap_id = snapc->snaps[which];
4345 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4346 if (IS_ERR(snap_name)) {
4347 /* ignore no-longer existing snapshots */
4348 if (PTR_ERR(snap_name) == -ENOENT)
4353 found = !strcmp(name, snap_name);
4356 return found ? snap_id : CEPH_NOSNAP;
4360 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4361 * no snapshot by that name is found, or if an error occurs.
4363 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4365 if (rbd_dev->image_format == 1)
4366 return rbd_v1_snap_id_by_name(rbd_dev, name);
4368 return rbd_v2_snap_id_by_name(rbd_dev, name);
4372 * An image being mapped will have everything but the snap id.
4374 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4376 struct rbd_spec *spec = rbd_dev->spec;
4378 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4379 rbd_assert(spec->image_id && spec->image_name);
4380 rbd_assert(spec->snap_name);
4382 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4385 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4386 if (snap_id == CEPH_NOSNAP)
4389 spec->snap_id = snap_id;
4391 spec->snap_id = CEPH_NOSNAP;
4398 * A parent image will have all ids but none of the names.
4400 * All names in an rbd spec are dynamically allocated. It's OK if we
4401 * can't figure out the name for an image id.
4403 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4405 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4406 struct rbd_spec *spec = rbd_dev->spec;
4407 const char *pool_name;
4408 const char *image_name;
4409 const char *snap_name;
4412 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4413 rbd_assert(spec->image_id);
4414 rbd_assert(spec->snap_id != CEPH_NOSNAP);
4416 /* Get the pool name; we have to make our own copy of this */
4418 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4420 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4423 pool_name = kstrdup(pool_name, GFP_KERNEL);
4427 /* Fetch the image name; tolerate failure here */
4429 image_name = rbd_dev_image_name(rbd_dev);
4431 rbd_warn(rbd_dev, "unable to get image name");
4433 /* Fetch the snapshot name */
4435 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4436 if (IS_ERR(snap_name)) {
4437 ret = PTR_ERR(snap_name);
4441 spec->pool_name = pool_name;
4442 spec->image_name = image_name;
4443 spec->snap_name = snap_name;
4453 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4462 struct ceph_snap_context *snapc;
4466 * We'll need room for the seq value (maximum snapshot id),
4467 * snapshot count, and array of that many snapshot ids.
4468 * For now we have a fixed upper limit on the number we're
4469 * prepared to receive.
4471 size = sizeof (__le64) + sizeof (__le32) +
4472 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4473 reply_buf = kzalloc(size, GFP_KERNEL);
4477 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4478 "rbd", "get_snapcontext", NULL, 0,
4480 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4485 end = reply_buf + ret;
4487 ceph_decode_64_safe(&p, end, seq, out);
4488 ceph_decode_32_safe(&p, end, snap_count, out);
4491 * Make sure the reported number of snapshot ids wouldn't go
4492 * beyond the end of our buffer. But before checking that,
4493 * make sure the computed size of the snapshot context we
4494 * allocate is representable in a size_t.
4496 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4501 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4505 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4511 for (i = 0; i < snap_count; i++)
4512 snapc->snaps[i] = ceph_decode_64(&p);
4514 ceph_put_snap_context(rbd_dev->header.snapc);
4515 rbd_dev->header.snapc = snapc;
4517 dout(" snap context seq = %llu, snap_count = %u\n",
4518 (unsigned long long)seq, (unsigned int)snap_count);
4525 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4536 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4537 reply_buf = kmalloc(size, GFP_KERNEL);
4539 return ERR_PTR(-ENOMEM);
4541 snapid = cpu_to_le64(snap_id);
4542 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4543 "rbd", "get_snapshot_name",
4544 &snapid, sizeof (snapid),
4546 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4548 snap_name = ERR_PTR(ret);
4553 end = reply_buf + ret;
4554 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4555 if (IS_ERR(snap_name))
4558 dout(" snap_id 0x%016llx snap_name = %s\n",
4559 (unsigned long long)snap_id, snap_name);
4566 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4568 bool first_time = rbd_dev->header.object_prefix == NULL;
4571 ret = rbd_dev_v2_image_size(rbd_dev);
4576 ret = rbd_dev_v2_header_onetime(rbd_dev);
4581 ret = rbd_dev_v2_snap_context(rbd_dev);
4582 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4587 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4589 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4591 if (rbd_dev->image_format == 1)
4592 return rbd_dev_v1_header_info(rbd_dev);
4594 return rbd_dev_v2_header_info(rbd_dev);
4597 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4602 dev = &rbd_dev->dev;
4603 dev->bus = &rbd_bus_type;
4604 dev->type = &rbd_device_type;
4605 dev->parent = &rbd_root_dev;
4606 dev->release = rbd_dev_device_release;
4607 dev_set_name(dev, "%d", rbd_dev->dev_id);
4608 ret = device_register(dev);
4613 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4615 device_unregister(&rbd_dev->dev);
4619 * Get a unique rbd identifier for the given new rbd_dev, and add
4620 * the rbd_dev to the global list.
4622 static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4626 new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4627 0, minor_to_rbd_dev_id(1 << MINORBITS),
4632 rbd_dev->dev_id = new_dev_id;
4634 spin_lock(&rbd_dev_list_lock);
4635 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4636 spin_unlock(&rbd_dev_list_lock);
4638 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4644 * Remove an rbd_dev from the global list, and record that its
4645 * identifier is no longer in use.
4647 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4649 spin_lock(&rbd_dev_list_lock);
4650 list_del_init(&rbd_dev->node);
4651 spin_unlock(&rbd_dev_list_lock);
4653 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4655 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4659 * Skips over white space at *buf, and updates *buf to point to the
4660 * first found non-space character (if any). Returns the length of
4661 * the token (string of non-white space characters) found. Note
4662 * that *buf must be terminated with '\0'.
4664 static inline size_t next_token(const char **buf)
4667 * These are the characters that produce nonzero for
4668 * isspace() in the "C" and "POSIX" locales.
4670 const char *spaces = " \f\n\r\t\v";
4672 *buf += strspn(*buf, spaces); /* Find start of token */
4674 return strcspn(*buf, spaces); /* Return token length */
4678 * Finds the next token in *buf, and if the provided token buffer is
4679 * big enough, copies the found token into it. The result, if
4680 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4681 * must be terminated with '\0' on entry.
4683 * Returns the length of the token found (not including the '\0').
4684 * Return value will be 0 if no token is found, and it will be >=
4685 * token_size if the token would not fit.
4687 * The *buf pointer will be updated to point beyond the end of the
4688 * found token. Note that this occurs even if the token buffer is
4689 * too small to hold it.
4691 static inline size_t copy_token(const char **buf,
4697 len = next_token(buf);
4698 if (len < token_size) {
4699 memcpy(token, *buf, len);
4700 *(token + len) = '\0';
4708 * Finds the next token in *buf, dynamically allocates a buffer big
4709 * enough to hold a copy of it, and copies the token into the new
4710 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4711 * that a duplicate buffer is created even for a zero-length token.
4713 * Returns a pointer to the newly-allocated duplicate, or a null
4714 * pointer if memory for the duplicate was not available. If
4715 * the lenp argument is a non-null pointer, the length of the token
4716 * (not including the '\0') is returned in *lenp.
4718 * If successful, the *buf pointer will be updated to point beyond
4719 * the end of the found token.
4721 * Note: uses GFP_KERNEL for allocation.
4723 static inline char *dup_token(const char **buf, size_t *lenp)
4728 len = next_token(buf);
4729 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4732 *(dup + len) = '\0';
4742 * Parse the options provided for an "rbd add" (i.e., rbd image
4743 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4744 * and the data written is passed here via a NUL-terminated buffer.
4745 * Returns 0 if successful or an error code otherwise.
4747 * The information extracted from these options is recorded in
4748 * the other parameters which return dynamically-allocated
4751 * The address of a pointer that will refer to a ceph options
4752 * structure. Caller must release the returned pointer using
4753 * ceph_destroy_options() when it is no longer needed.
4755 * Address of an rbd options pointer. Fully initialized by
4756 * this function; caller must release with kfree().
4758 * Address of an rbd image specification pointer. Fully
4759 * initialized by this function based on parsed options.
4760 * Caller must release with rbd_spec_put().
4762 * The options passed take this form:
4763 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4766 * A comma-separated list of one or more monitor addresses.
4767 * A monitor address is an ip address, optionally followed
4768 * by a port number (separated by a colon).
4769 * I.e.: ip1[:port1][,ip2[:port2]...]
4771 * A comma-separated list of ceph and/or rbd options.
4773 * The name of the rados pool containing the rbd image.
4775 * The name of the image in that pool to map.
4777 * An optional snapshot id. If provided, the mapping will
4778 * present data from the image at the time that snapshot was
4779 * created. The image head is used if no snapshot id is
4780 * provided. Snapshot mappings are always read-only.
4782 static int rbd_add_parse_args(const char *buf,
4783 struct ceph_options **ceph_opts,
4784 struct rbd_options **opts,
4785 struct rbd_spec **rbd_spec)
4789 const char *mon_addrs;
4791 size_t mon_addrs_size;
4792 struct rbd_spec *spec = NULL;
4793 struct rbd_options *rbd_opts = NULL;
4794 struct ceph_options *copts;
4797 /* The first four tokens are required */
4799 len = next_token(&buf);
4801 rbd_warn(NULL, "no monitor address(es) provided");
4805 mon_addrs_size = len + 1;
4809 options = dup_token(&buf, NULL);
4813 rbd_warn(NULL, "no options provided");
4817 spec = rbd_spec_alloc();
4821 spec->pool_name = dup_token(&buf, NULL);
4822 if (!spec->pool_name)
4824 if (!*spec->pool_name) {
4825 rbd_warn(NULL, "no pool name provided");
4829 spec->image_name = dup_token(&buf, NULL);
4830 if (!spec->image_name)
4832 if (!*spec->image_name) {
4833 rbd_warn(NULL, "no image name provided");
4838 * Snapshot name is optional; default is to use "-"
4839 * (indicating the head/no snapshot).
4841 len = next_token(&buf);
4843 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4844 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4845 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4846 ret = -ENAMETOOLONG;
4849 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4852 *(snap_name + len) = '\0';
4853 spec->snap_name = snap_name;
4855 /* Initialize all rbd options to the defaults */
4857 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4861 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4863 copts = ceph_parse_options(options, mon_addrs,
4864 mon_addrs + mon_addrs_size - 1,
4865 parse_rbd_opts_token, rbd_opts);
4866 if (IS_ERR(copts)) {
4867 ret = PTR_ERR(copts);
4888 * Return pool id (>= 0) or a negative error code.
4890 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
4893 unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
4898 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
4899 if (ret == -ENOENT && tries++ < 1) {
4900 ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
4905 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
4906 ceph_monc_request_next_osdmap(&rbdc->client->monc);
4907 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
4908 newest_epoch, timeout);
4911 /* the osdmap we have is new enough */
4920 * An rbd format 2 image has a unique identifier, distinct from the
4921 * name given to it by the user. Internally, that identifier is
4922 * what's used to specify the names of objects related to the image.
4924 * A special "rbd id" object is used to map an rbd image name to its
4925 * id. If that object doesn't exist, then there is no v2 rbd image
4926 * with the supplied name.
4928 * This function will record the given rbd_dev's image_id field if
4929 * it can be determined, and in that case will return 0. If any
4930 * errors occur a negative errno will be returned and the rbd_dev's
4931 * image_id field will be unchanged (and should be NULL).
4933 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4942 * When probing a parent image, the image id is already
4943 * known (and the image name likely is not). There's no
4944 * need to fetch the image id again in this case. We
4945 * do still need to set the image format though.
4947 if (rbd_dev->spec->image_id) {
4948 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4954 * First, see if the format 2 image id file exists, and if
4955 * so, get the image's persistent id from it.
4957 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4958 object_name = kmalloc(size, GFP_NOIO);
4961 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4962 dout("rbd id object name is %s\n", object_name);
4964 /* Response will be an encoded string, which includes a length */
4966 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4967 response = kzalloc(size, GFP_NOIO);
4973 /* If it doesn't exist we'll assume it's a format 1 image */
4975 ret = rbd_obj_method_sync(rbd_dev, object_name,
4976 "rbd", "get_id", NULL, 0,
4977 response, RBD_IMAGE_ID_LEN_MAX);
4978 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4979 if (ret == -ENOENT) {
4980 image_id = kstrdup("", GFP_KERNEL);
4981 ret = image_id ? 0 : -ENOMEM;
4983 rbd_dev->image_format = 1;
4984 } else if (ret >= 0) {
4987 image_id = ceph_extract_encoded_string(&p, p + ret,
4989 ret = PTR_ERR_OR_ZERO(image_id);
4991 rbd_dev->image_format = 2;
4995 rbd_dev->spec->image_id = image_id;
4996 dout("image_id is %s\n", image_id);
5006 * Undo whatever state changes are made by v1 or v2 header info
5009 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5011 struct rbd_image_header *header;
5013 /* Drop parent reference unless it's already been done (or none) */
5015 if (rbd_dev->parent_overlap)
5016 rbd_dev_parent_put(rbd_dev);
5018 /* Free dynamic fields from the header, then zero it out */
5020 header = &rbd_dev->header;
5021 ceph_put_snap_context(header->snapc);
5022 kfree(header->snap_sizes);
5023 kfree(header->snap_names);
5024 kfree(header->object_prefix);
5025 memset(header, 0, sizeof (*header));
5028 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5032 ret = rbd_dev_v2_object_prefix(rbd_dev);
5037 * Get the and check features for the image. Currently the
5038 * features are assumed to never change.
5040 ret = rbd_dev_v2_features(rbd_dev);
5044 /* If the image supports fancy striping, get its parameters */
5046 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5047 ret = rbd_dev_v2_striping_info(rbd_dev);
5051 /* No support for crypto and compression type format 2 images */
5055 rbd_dev->header.features = 0;
5056 kfree(rbd_dev->header.object_prefix);
5057 rbd_dev->header.object_prefix = NULL;
5062 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
5064 struct rbd_device *parent = NULL;
5065 struct rbd_spec *parent_spec;
5066 struct rbd_client *rbdc;
5069 if (!rbd_dev->parent_spec)
5072 * We need to pass a reference to the client and the parent
5073 * spec when creating the parent rbd_dev. Images related by
5074 * parent/child relationships always share both.
5076 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
5077 rbdc = __rbd_get_client(rbd_dev->rbd_client);
5080 parent = rbd_dev_create(rbdc, parent_spec);
5084 ret = rbd_dev_image_probe(parent, false);
5087 rbd_dev->parent = parent;
5088 atomic_set(&rbd_dev->parent_ref, 1);
5093 rbd_dev_unparent(rbd_dev);
5094 kfree(rbd_dev->header_name);
5095 rbd_dev_destroy(parent);
5097 rbd_put_client(rbdc);
5098 rbd_spec_put(parent_spec);
5104 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5108 /* Get an id and fill in device name. */
5110 ret = rbd_dev_id_get(rbd_dev);
5114 BUILD_BUG_ON(DEV_NAME_LEN
5115 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5116 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5118 /* Record our major and minor device numbers. */
5120 if (!single_major) {
5121 ret = register_blkdev(0, rbd_dev->name);
5125 rbd_dev->major = ret;
5128 rbd_dev->major = rbd_major;
5129 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5132 /* Set up the blkdev mapping. */
5134 ret = rbd_init_disk(rbd_dev);
5136 goto err_out_blkdev;
5138 ret = rbd_dev_mapping_set(rbd_dev);
5142 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5143 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5145 rbd_dev->rq_wq = alloc_workqueue("%s", 0, 0, rbd_dev->disk->disk_name);
5146 if (!rbd_dev->rq_wq) {
5148 goto err_out_mapping;
5151 ret = rbd_bus_add_dev(rbd_dev);
5153 goto err_out_workqueue;
5155 /* Everything's ready. Announce the disk to the world. */
5157 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5158 add_disk(rbd_dev->disk);
5160 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5161 (unsigned long long) rbd_dev->mapping.size);
5166 destroy_workqueue(rbd_dev->rq_wq);
5167 rbd_dev->rq_wq = NULL;
5169 rbd_dev_mapping_clear(rbd_dev);
5171 rbd_free_disk(rbd_dev);
5174 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5176 rbd_dev_id_put(rbd_dev);
5177 rbd_dev_mapping_clear(rbd_dev);
5182 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5184 struct rbd_spec *spec = rbd_dev->spec;
5187 /* Record the header object name for this rbd image. */
5189 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5191 if (rbd_dev->image_format == 1)
5192 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
5194 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
5196 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
5197 if (!rbd_dev->header_name)
5200 if (rbd_dev->image_format == 1)
5201 sprintf(rbd_dev->header_name, "%s%s",
5202 spec->image_name, RBD_SUFFIX);
5204 sprintf(rbd_dev->header_name, "%s%s",
5205 RBD_HEADER_PREFIX, spec->image_id);
5209 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5211 rbd_dev_unprobe(rbd_dev);
5212 kfree(rbd_dev->header_name);
5213 rbd_dev->header_name = NULL;
5214 rbd_dev->image_format = 0;
5215 kfree(rbd_dev->spec->image_id);
5216 rbd_dev->spec->image_id = NULL;
5218 rbd_dev_destroy(rbd_dev);
5222 * Probe for the existence of the header object for the given rbd
5223 * device. If this image is the one being mapped (i.e., not a
5224 * parent), initiate a watch on its header object before using that
5225 * object to get detailed information about the rbd image.
5227 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5232 * Get the id from the image id object. Unless there's an
5233 * error, rbd_dev->spec->image_id will be filled in with
5234 * a dynamically-allocated string, and rbd_dev->image_format
5235 * will be set to either 1 or 2.
5237 ret = rbd_dev_image_id(rbd_dev);
5241 ret = rbd_dev_header_name(rbd_dev);
5243 goto err_out_format;
5246 ret = rbd_dev_header_watch_sync(rbd_dev);
5248 goto out_header_name;
5251 ret = rbd_dev_header_info(rbd_dev);
5256 * If this image is the one being mapped, we have pool name and
5257 * id, image name and id, and snap name - need to fill snap id.
5258 * Otherwise this is a parent image, identified by pool, image
5259 * and snap ids - need to fill in names for those ids.
5262 ret = rbd_spec_fill_snap_id(rbd_dev);
5264 ret = rbd_spec_fill_names(rbd_dev);
5268 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5269 ret = rbd_dev_v2_parent_info(rbd_dev);
5274 * Need to warn users if this image is the one being
5275 * mapped and has a parent.
5277 if (mapping && rbd_dev->parent_spec)
5279 "WARNING: kernel layering is EXPERIMENTAL!");
5282 ret = rbd_dev_probe_parent(rbd_dev);
5286 dout("discovered format %u image, header name is %s\n",
5287 rbd_dev->image_format, rbd_dev->header_name);
5291 rbd_dev_unprobe(rbd_dev);
5294 rbd_dev_header_unwatch_sync(rbd_dev);
5296 kfree(rbd_dev->header_name);
5297 rbd_dev->header_name = NULL;
5299 rbd_dev->image_format = 0;
5300 kfree(rbd_dev->spec->image_id);
5301 rbd_dev->spec->image_id = NULL;
5305 static ssize_t do_rbd_add(struct bus_type *bus,
5309 struct rbd_device *rbd_dev = NULL;
5310 struct ceph_options *ceph_opts = NULL;
5311 struct rbd_options *rbd_opts = NULL;
5312 struct rbd_spec *spec = NULL;
5313 struct rbd_client *rbdc;
5317 if (!try_module_get(THIS_MODULE))
5320 /* parse add command */
5321 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5323 goto err_out_module;
5324 read_only = rbd_opts->read_only;
5326 rbd_opts = NULL; /* done with this */
5328 rbdc = rbd_get_client(ceph_opts);
5335 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
5337 goto err_out_client;
5338 spec->pool_id = (u64)rc;
5340 /* The ceph file layout needs to fit pool id in 32 bits */
5342 if (spec->pool_id > (u64)U32_MAX) {
5343 rbd_warn(NULL, "pool id too large (%llu > %u)",
5344 (unsigned long long)spec->pool_id, U32_MAX);
5346 goto err_out_client;
5349 rbd_dev = rbd_dev_create(rbdc, spec);
5351 goto err_out_client;
5352 rbdc = NULL; /* rbd_dev now owns this */
5353 spec = NULL; /* rbd_dev now owns this */
5355 rc = rbd_dev_image_probe(rbd_dev, true);
5357 goto err_out_rbd_dev;
5359 /* If we are mapping a snapshot it must be marked read-only */
5361 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5363 rbd_dev->mapping.read_only = read_only;
5365 rc = rbd_dev_device_setup(rbd_dev);
5368 * rbd_dev_header_unwatch_sync() can't be moved into
5369 * rbd_dev_image_release() without refactoring, see
5370 * commit 1f3ef78861ac.
5372 rbd_dev_header_unwatch_sync(rbd_dev);
5373 rbd_dev_image_release(rbd_dev);
5374 goto err_out_module;
5380 rbd_dev_destroy(rbd_dev);
5382 rbd_put_client(rbdc);
5386 module_put(THIS_MODULE);
5388 dout("Error adding device %s\n", buf);
5393 static ssize_t rbd_add(struct bus_type *bus,
5400 return do_rbd_add(bus, buf, count);
5403 static ssize_t rbd_add_single_major(struct bus_type *bus,
5407 return do_rbd_add(bus, buf, count);
5410 static void rbd_dev_device_release(struct device *dev)
5412 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5414 destroy_workqueue(rbd_dev->rq_wq);
5415 rbd_free_disk(rbd_dev);
5416 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5417 rbd_dev_mapping_clear(rbd_dev);
5419 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5420 rbd_dev_id_put(rbd_dev);
5421 rbd_dev_mapping_clear(rbd_dev);
5424 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5426 while (rbd_dev->parent) {
5427 struct rbd_device *first = rbd_dev;
5428 struct rbd_device *second = first->parent;
5429 struct rbd_device *third;
5432 * Follow to the parent with no grandparent and
5435 while (second && (third = second->parent)) {
5440 rbd_dev_image_release(second);
5441 first->parent = NULL;
5442 first->parent_overlap = 0;
5444 rbd_assert(first->parent_spec);
5445 rbd_spec_put(first->parent_spec);
5446 first->parent_spec = NULL;
5450 static ssize_t do_rbd_remove(struct bus_type *bus,
5454 struct rbd_device *rbd_dev = NULL;
5455 struct list_head *tmp;
5458 bool already = false;
5461 ret = kstrtoul(buf, 10, &ul);
5465 /* convert to int; abort if we lost anything in the conversion */
5471 spin_lock(&rbd_dev_list_lock);
5472 list_for_each(tmp, &rbd_dev_list) {
5473 rbd_dev = list_entry(tmp, struct rbd_device, node);
5474 if (rbd_dev->dev_id == dev_id) {
5480 spin_lock_irq(&rbd_dev->lock);
5481 if (rbd_dev->open_count)
5484 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5486 spin_unlock_irq(&rbd_dev->lock);
5488 spin_unlock(&rbd_dev_list_lock);
5489 if (ret < 0 || already)
5492 rbd_dev_header_unwatch_sync(rbd_dev);
5494 * flush remaining watch callbacks - these must be complete
5495 * before the osd_client is shutdown
5497 dout("%s: flushing notifies", __func__);
5498 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5501 * Don't free anything from rbd_dev->disk until after all
5502 * notifies are completely processed. Otherwise
5503 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5504 * in a potential use after free of rbd_dev->disk or rbd_dev.
5506 rbd_bus_del_dev(rbd_dev);
5507 rbd_dev_image_release(rbd_dev);
5508 module_put(THIS_MODULE);
5513 static ssize_t rbd_remove(struct bus_type *bus,
5520 return do_rbd_remove(bus, buf, count);
5523 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5527 return do_rbd_remove(bus, buf, count);
5531 * create control files in sysfs
5534 static int rbd_sysfs_init(void)
5538 ret = device_register(&rbd_root_dev);
5542 ret = bus_register(&rbd_bus_type);
5544 device_unregister(&rbd_root_dev);
5549 static void rbd_sysfs_cleanup(void)
5551 bus_unregister(&rbd_bus_type);
5552 device_unregister(&rbd_root_dev);
5555 static int rbd_slab_init(void)
5557 rbd_assert(!rbd_img_request_cache);
5558 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5559 sizeof (struct rbd_img_request),
5560 __alignof__(struct rbd_img_request),
5562 if (!rbd_img_request_cache)
5565 rbd_assert(!rbd_obj_request_cache);
5566 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5567 sizeof (struct rbd_obj_request),
5568 __alignof__(struct rbd_obj_request),
5570 if (!rbd_obj_request_cache)
5573 rbd_assert(!rbd_segment_name_cache);
5574 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5575 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5576 if (rbd_segment_name_cache)
5579 if (rbd_obj_request_cache) {
5580 kmem_cache_destroy(rbd_obj_request_cache);
5581 rbd_obj_request_cache = NULL;
5584 kmem_cache_destroy(rbd_img_request_cache);
5585 rbd_img_request_cache = NULL;
5590 static void rbd_slab_exit(void)
5592 rbd_assert(rbd_segment_name_cache);
5593 kmem_cache_destroy(rbd_segment_name_cache);
5594 rbd_segment_name_cache = NULL;
5596 rbd_assert(rbd_obj_request_cache);
5597 kmem_cache_destroy(rbd_obj_request_cache);
5598 rbd_obj_request_cache = NULL;
5600 rbd_assert(rbd_img_request_cache);
5601 kmem_cache_destroy(rbd_img_request_cache);
5602 rbd_img_request_cache = NULL;
5605 static int __init rbd_init(void)
5609 if (!libceph_compatible(NULL)) {
5610 rbd_warn(NULL, "libceph incompatibility (quitting)");
5614 rc = rbd_slab_init();
5619 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5620 if (rbd_major < 0) {
5626 rc = rbd_sysfs_init();
5628 goto err_out_blkdev;
5631 pr_info("loaded (major %d)\n", rbd_major);
5633 pr_info("loaded\n");
5639 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5645 static void __exit rbd_exit(void)
5647 ida_destroy(&rbd_dev_id_ida);
5648 rbd_sysfs_cleanup();
5650 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5654 module_init(rbd_init);
5655 module_exit(rbd_exit);
5657 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5658 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5659 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5660 /* following authorship retained from original osdblk.c */
5661 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5663 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5664 MODULE_LICENSE("GPL");