3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
45 #include "rbd_types.h"
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_SNAP_HEAD_NAME "-"
71 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 #define RBD_FEATURE_LAYERING (1<<0)
82 #define RBD_FEATURE_STRIPINGV2 (1<<1)
83 #define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
86 /* Features supported by this (client software) implementation. */
88 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
96 #define DEV_NAME_LEN 32
97 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
100 * block device image metadata (in-memory version)
102 struct rbd_image_header {
103 /* These six fields never change for a given rbd image */
110 u64 features; /* Might be changeable someday? */
112 /* The remaining fields need to be updated occasionally */
114 struct ceph_snap_context *snapc;
115 char *snap_names; /* format 1 only */
116 u64 *snap_sizes; /* format 1 only */
120 * An rbd image specification.
122 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123 * identify an image. Each rbd_dev structure includes a pointer to
124 * an rbd_spec structure that encapsulates this identity.
126 * Each of the id's in an rbd_spec has an associated name. For a
127 * user-mapped image, the names are supplied and the id's associated
128 * with them are looked up. For a layered image, a parent image is
129 * defined by the tuple, and the names are looked up.
131 * An rbd_dev structure contains a parent_spec pointer which is
132 * non-null if the image it represents is a child in a layered
133 * image. This pointer will refer to the rbd_spec structure used
134 * by the parent rbd_dev for its own identity (i.e., the structure
135 * is shared between the parent and child).
137 * Since these structures are populated once, during the discovery
138 * phase of image construction, they are effectively immutable so
139 * we make no effort to synchronize access to them.
141 * Note that code herein does not assume the image name is known (it
142 * could be a null pointer).
146 const char *pool_name;
148 const char *image_id;
149 const char *image_name;
152 const char *snap_name;
158 * an instance of the client. multiple devices may share an rbd client.
161 struct ceph_client *client;
163 struct list_head node;
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174 enum obj_request_type {
175 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
179 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
180 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
181 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
182 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
185 struct rbd_obj_request {
186 const char *object_name;
187 u64 offset; /* object start byte */
188 u64 length; /* bytes from offset */
192 * An object request associated with an image will have its
193 * img_data flag set; a standalone object request will not.
195 * A standalone object request will have which == BAD_WHICH
196 * and a null obj_request pointer.
198 * An object request initiated in support of a layered image
199 * object (to check for its existence before a write) will
200 * have which == BAD_WHICH and a non-null obj_request pointer.
202 * Finally, an object request for rbd image data will have
203 * which != BAD_WHICH, and will have a non-null img_request
204 * pointer. The value of which will be in the range
205 * 0..(img_request->obj_request_count-1).
208 struct rbd_obj_request *obj_request; /* STAT op */
210 struct rbd_img_request *img_request;
212 /* links for img_request->obj_requests list */
213 struct list_head links;
216 u32 which; /* posn image request list */
218 enum obj_request_type type;
220 struct bio *bio_list;
226 struct page **copyup_pages;
227 u32 copyup_page_count;
229 struct ceph_osd_request *osd_req;
231 u64 xferred; /* bytes transferred */
234 rbd_obj_callback_t callback;
235 struct completion completion;
241 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
242 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
243 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
246 struct rbd_img_request {
247 struct rbd_device *rbd_dev;
248 u64 offset; /* starting image byte offset */
249 u64 length; /* byte count from offset */
252 u64 snap_id; /* for reads */
253 struct ceph_snap_context *snapc; /* for writes */
256 struct request *rq; /* block request */
257 struct rbd_obj_request *obj_request; /* obj req initiator */
259 struct page **copyup_pages;
260 u32 copyup_page_count;
261 spinlock_t completion_lock;/* protects next_completion */
263 rbd_img_callback_t callback;
264 u64 xferred;/* aggregate bytes transferred */
265 int result; /* first nonzero obj_request result */
267 u32 obj_request_count;
268 struct list_head obj_requests; /* rbd_obj_request structs */
273 #define for_each_obj_request(ireq, oreq) \
274 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_from(ireq, oreq) \
276 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
277 #define for_each_obj_request_safe(ireq, oreq, n) \
278 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
290 int dev_id; /* blkdev unique id */
292 int major; /* blkdev assigned major */
293 struct gendisk *disk; /* blkdev's gendisk and rq */
295 u32 image_format; /* Either 1 or 2 */
296 struct rbd_client *rbd_client;
298 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
300 spinlock_t lock; /* queue, flags, open_count */
302 struct rbd_image_header header;
303 unsigned long flags; /* possibly lock protected */
304 struct rbd_spec *spec;
308 struct ceph_file_layout layout;
310 struct ceph_osd_event *watch_event;
311 struct rbd_obj_request *watch_request;
313 struct rbd_spec *parent_spec;
315 struct rbd_device *parent;
317 /* protects updating the header */
318 struct rw_semaphore header_rwsem;
320 struct rbd_mapping mapping;
322 struct list_head node;
326 unsigned long open_count; /* protected by lock */
330 * Flag bits for rbd_dev->flags. If atomicity is required,
331 * rbd_dev->lock is used to protect access.
333 * Currently, only the "removing" flag (which is coupled with the
334 * "open_count" field) requires atomic access.
337 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
338 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
341 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
343 static LIST_HEAD(rbd_dev_list); /* devices */
344 static DEFINE_SPINLOCK(rbd_dev_list_lock);
346 static LIST_HEAD(rbd_client_list); /* clients */
347 static DEFINE_SPINLOCK(rbd_client_list_lock);
349 /* Slab caches for frequently-allocated structures */
351 static struct kmem_cache *rbd_img_request_cache;
352 static struct kmem_cache *rbd_obj_request_cache;
353 static struct kmem_cache *rbd_segment_name_cache;
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
357 static void rbd_dev_device_release(struct device *dev);
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
363 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
365 static struct bus_attribute rbd_bus_attrs[] = {
366 __ATTR(add, S_IWUSR, NULL, rbd_add),
367 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
371 static struct bus_type rbd_bus_type = {
373 .bus_attrs = rbd_bus_attrs,
376 static void rbd_root_dev_release(struct device *dev)
380 static struct device rbd_root_dev = {
382 .release = rbd_root_dev_release,
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
388 struct va_format vaf;
396 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397 else if (rbd_dev->disk)
398 printk(KERN_WARNING "%s: %s: %pV\n",
399 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400 else if (rbd_dev->spec && rbd_dev->spec->image_name)
401 printk(KERN_WARNING "%s: image %s: %pV\n",
402 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403 else if (rbd_dev->spec && rbd_dev->spec->image_id)
404 printk(KERN_WARNING "%s: id %s: %pV\n",
405 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
407 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408 RBD_DRV_NAME, rbd_dev, &vaf);
413 #define rbd_assert(expr) \
414 if (unlikely(!(expr))) { \
415 printk(KERN_ERR "\nAssertion failure in %s() " \
417 "\trbd_assert(%s);\n\n", \
418 __func__, __LINE__, #expr); \
421 #else /* !RBD_DEBUG */
422 # define rbd_assert(expr) ((void) 0)
423 #endif /* !RBD_DEBUG */
425 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
426 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
427 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
429 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
430 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
431 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
432 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
434 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
435 u8 *order, u64 *snap_size);
436 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
438 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
440 static int rbd_open(struct block_device *bdev, fmode_t mode)
442 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
443 bool removing = false;
445 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
448 spin_lock_irq(&rbd_dev->lock);
449 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
452 rbd_dev->open_count++;
453 spin_unlock_irq(&rbd_dev->lock);
457 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
458 (void) get_device(&rbd_dev->dev);
459 set_device_ro(bdev, rbd_dev->mapping.read_only);
460 mutex_unlock(&ctl_mutex);
465 static int rbd_release(struct gendisk *disk, fmode_t mode)
467 struct rbd_device *rbd_dev = disk->private_data;
468 unsigned long open_count_before;
470 spin_lock_irq(&rbd_dev->lock);
471 open_count_before = rbd_dev->open_count--;
472 spin_unlock_irq(&rbd_dev->lock);
473 rbd_assert(open_count_before > 0);
475 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
476 put_device(&rbd_dev->dev);
477 mutex_unlock(&ctl_mutex);
482 static const struct block_device_operations rbd_bd_ops = {
483 .owner = THIS_MODULE,
485 .release = rbd_release,
489 * Initialize an rbd client instance.
492 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
494 struct rbd_client *rbdc;
497 dout("%s:\n", __func__);
498 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
502 kref_init(&rbdc->kref);
503 INIT_LIST_HEAD(&rbdc->node);
505 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
507 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
508 if (IS_ERR(rbdc->client))
510 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
512 ret = ceph_open_session(rbdc->client);
516 spin_lock(&rbd_client_list_lock);
517 list_add_tail(&rbdc->node, &rbd_client_list);
518 spin_unlock(&rbd_client_list_lock);
520 mutex_unlock(&ctl_mutex);
521 dout("%s: rbdc %p\n", __func__, rbdc);
526 ceph_destroy_client(rbdc->client);
528 mutex_unlock(&ctl_mutex);
532 ceph_destroy_options(ceph_opts);
533 dout("%s: error %d\n", __func__, ret);
538 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
540 kref_get(&rbdc->kref);
546 * Find a ceph client with specific addr and configuration. If
547 * found, bump its reference count.
549 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
551 struct rbd_client *client_node;
554 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
557 spin_lock(&rbd_client_list_lock);
558 list_for_each_entry(client_node, &rbd_client_list, node) {
559 if (!ceph_compare_options(ceph_opts, client_node->client)) {
560 __rbd_get_client(client_node);
566 spin_unlock(&rbd_client_list_lock);
568 return found ? client_node : NULL;
578 /* string args above */
581 /* Boolean args above */
585 static match_table_t rbd_opts_tokens = {
587 /* string args above */
588 {Opt_read_only, "read_only"},
589 {Opt_read_only, "ro"}, /* Alternate spelling */
590 {Opt_read_write, "read_write"},
591 {Opt_read_write, "rw"}, /* Alternate spelling */
592 /* Boolean args above */
600 #define RBD_READ_ONLY_DEFAULT false
602 static int parse_rbd_opts_token(char *c, void *private)
604 struct rbd_options *rbd_opts = private;
605 substring_t argstr[MAX_OPT_ARGS];
606 int token, intval, ret;
608 token = match_token(c, rbd_opts_tokens, argstr);
612 if (token < Opt_last_int) {
613 ret = match_int(&argstr[0], &intval);
615 pr_err("bad mount option arg (not int) "
619 dout("got int token %d val %d\n", token, intval);
620 } else if (token > Opt_last_int && token < Opt_last_string) {
621 dout("got string token %d val %s\n", token,
623 } else if (token > Opt_last_string && token < Opt_last_bool) {
624 dout("got Boolean token %d\n", token);
626 dout("got token %d\n", token);
631 rbd_opts->read_only = true;
634 rbd_opts->read_only = false;
644 * Get a ceph client with specific addr and configuration, if one does
645 * not exist create it.
647 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
649 struct rbd_client *rbdc;
651 rbdc = rbd_client_find(ceph_opts);
652 if (rbdc) /* using an existing client */
653 ceph_destroy_options(ceph_opts);
655 rbdc = rbd_client_create(ceph_opts);
661 * Destroy ceph client
663 * Caller must hold rbd_client_list_lock.
665 static void rbd_client_release(struct kref *kref)
667 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
669 dout("%s: rbdc %p\n", __func__, rbdc);
670 spin_lock(&rbd_client_list_lock);
671 list_del(&rbdc->node);
672 spin_unlock(&rbd_client_list_lock);
674 ceph_destroy_client(rbdc->client);
679 * Drop reference to ceph client node. If it's not referenced anymore, release
682 static void rbd_put_client(struct rbd_client *rbdc)
685 kref_put(&rbdc->kref, rbd_client_release);
688 static bool rbd_image_format_valid(u32 image_format)
690 return image_format == 1 || image_format == 2;
693 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
698 /* The header has to start with the magic rbd header text */
699 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
702 /* The bio layer requires at least sector-sized I/O */
704 if (ondisk->options.order < SECTOR_SHIFT)
707 /* If we use u64 in a few spots we may be able to loosen this */
709 if (ondisk->options.order > 8 * sizeof (int) - 1)
713 * The size of a snapshot header has to fit in a size_t, and
714 * that limits the number of snapshots.
716 snap_count = le32_to_cpu(ondisk->snap_count);
717 size = SIZE_MAX - sizeof (struct ceph_snap_context);
718 if (snap_count > size / sizeof (__le64))
722 * Not only that, but the size of the entire the snapshot
723 * header must also be representable in a size_t.
725 size -= snap_count * sizeof (__le64);
726 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
733 * Fill an rbd image header with information from the given format 1
736 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
737 struct rbd_image_header_ondisk *ondisk)
739 struct rbd_image_header *header = &rbd_dev->header;
740 bool first_time = header->object_prefix == NULL;
741 struct ceph_snap_context *snapc;
742 char *object_prefix = NULL;
743 char *snap_names = NULL;
744 u64 *snap_sizes = NULL;
750 /* Allocate this now to avoid having to handle failure below */
755 len = strnlen(ondisk->object_prefix,
756 sizeof (ondisk->object_prefix));
757 object_prefix = kmalloc(len + 1, GFP_KERNEL);
760 memcpy(object_prefix, ondisk->object_prefix, len);
761 object_prefix[len] = '\0';
764 /* Allocate the snapshot context and fill it in */
766 snap_count = le32_to_cpu(ondisk->snap_count);
767 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
770 snapc->seq = le64_to_cpu(ondisk->snap_seq);
772 struct rbd_image_snap_ondisk *snaps;
773 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
775 /* We'll keep a copy of the snapshot names... */
777 if (snap_names_len > (u64)SIZE_MAX)
779 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
783 /* ...as well as the array of their sizes. */
785 size = snap_count * sizeof (*header->snap_sizes);
786 snap_sizes = kmalloc(size, GFP_KERNEL);
791 * Copy the names, and fill in each snapshot's id
794 * Note that rbd_dev_v1_header_info() guarantees the
795 * ondisk buffer we're working with has
796 * snap_names_len bytes beyond the end of the
797 * snapshot id array, this memcpy() is safe.
799 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
800 snaps = ondisk->snaps;
801 for (i = 0; i < snap_count; i++) {
802 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
803 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
807 /* We won't fail any more, fill in the header */
809 down_write(&rbd_dev->header_rwsem);
811 header->object_prefix = object_prefix;
812 header->obj_order = ondisk->options.order;
813 header->crypt_type = ondisk->options.crypt_type;
814 header->comp_type = ondisk->options.comp_type;
815 /* The rest aren't used for format 1 images */
816 header->stripe_unit = 0;
817 header->stripe_count = 0;
818 header->features = 0;
820 ceph_put_snap_context(header->snapc);
821 kfree(header->snap_names);
822 kfree(header->snap_sizes);
825 /* The remaining fields always get updated (when we refresh) */
827 header->image_size = le64_to_cpu(ondisk->image_size);
828 header->snapc = snapc;
829 header->snap_names = snap_names;
830 header->snap_sizes = snap_sizes;
832 /* Make sure mapping size is consistent with header info */
834 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
835 if (rbd_dev->mapping.size != header->image_size)
836 rbd_dev->mapping.size = header->image_size;
838 up_write(&rbd_dev->header_rwsem);
846 ceph_put_snap_context(snapc);
847 kfree(object_prefix);
852 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
854 const char *snap_name;
856 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
858 /* Skip over names until we find the one we are looking for */
860 snap_name = rbd_dev->header.snap_names;
862 snap_name += strlen(snap_name) + 1;
864 return kstrdup(snap_name, GFP_KERNEL);
868 * Snapshot id comparison function for use with qsort()/bsearch().
869 * Note that result is for snapshots in *descending* order.
871 static int snapid_compare_reverse(const void *s1, const void *s2)
873 u64 snap_id1 = *(u64 *)s1;
874 u64 snap_id2 = *(u64 *)s2;
876 if (snap_id1 < snap_id2)
878 return snap_id1 == snap_id2 ? 0 : -1;
882 * Search a snapshot context to see if the given snapshot id is
885 * Returns the position of the snapshot id in the array if it's found,
886 * or BAD_SNAP_INDEX otherwise.
888 * Note: The snapshot array is in kept sorted (by the osd) in
889 * reverse order, highest snapshot id first.
891 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
893 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
896 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
897 sizeof (snap_id), snapid_compare_reverse);
899 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
902 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
907 which = rbd_dev_snap_index(rbd_dev, snap_id);
908 if (which == BAD_SNAP_INDEX)
911 return _rbd_dev_v1_snap_name(rbd_dev, which);
914 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
916 if (snap_id == CEPH_NOSNAP)
917 return RBD_SNAP_HEAD_NAME;
919 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920 if (rbd_dev->image_format == 1)
921 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
923 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
926 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
929 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
930 if (snap_id == CEPH_NOSNAP) {
931 *snap_size = rbd_dev->header.image_size;
932 } else if (rbd_dev->image_format == 1) {
935 which = rbd_dev_snap_index(rbd_dev, snap_id);
936 if (which == BAD_SNAP_INDEX)
939 *snap_size = rbd_dev->header.snap_sizes[which];
944 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
953 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
956 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
957 if (snap_id == CEPH_NOSNAP) {
958 *snap_features = rbd_dev->header.features;
959 } else if (rbd_dev->image_format == 1) {
960 *snap_features = 0; /* No features for format 1 */
965 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
969 *snap_features = features;
974 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
976 u64 snap_id = rbd_dev->spec->snap_id;
981 ret = rbd_snap_size(rbd_dev, snap_id, &size);
984 ret = rbd_snap_features(rbd_dev, snap_id, &features);
988 rbd_dev->mapping.size = size;
989 rbd_dev->mapping.features = features;
994 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
996 rbd_dev->mapping.size = 0;
997 rbd_dev->mapping.features = 0;
1000 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1006 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1009 segment = offset >> rbd_dev->header.obj_order;
1010 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1011 rbd_dev->header.object_prefix, segment);
1012 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1013 pr_err("error formatting segment name for #%llu (%d)\n",
1022 static void rbd_segment_name_free(const char *name)
1024 /* The explicit cast here is needed to drop the const qualifier */
1026 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1029 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1031 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1033 return offset & (segment_size - 1);
1036 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1037 u64 offset, u64 length)
1039 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1041 offset &= segment_size - 1;
1043 rbd_assert(length <= U64_MAX - offset);
1044 if (offset + length > segment_size)
1045 length = segment_size - offset;
1051 * returns the size of an object in the image
1053 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1055 return 1 << header->obj_order;
1062 static void bio_chain_put(struct bio *chain)
1068 chain = chain->bi_next;
1074 * zeros a bio chain, starting at specific offset
1076 static void zero_bio_chain(struct bio *chain, int start_ofs)
1079 unsigned long flags;
1085 bio_for_each_segment(bv, chain, i) {
1086 if (pos + bv->bv_len > start_ofs) {
1087 int remainder = max(start_ofs - pos, 0);
1088 buf = bvec_kmap_irq(bv, &flags);
1089 memset(buf + remainder, 0,
1090 bv->bv_len - remainder);
1091 bvec_kunmap_irq(buf, &flags);
1096 chain = chain->bi_next;
1101 * similar to zero_bio_chain(), zeros data defined by a page array,
1102 * starting at the given byte offset from the start of the array and
1103 * continuing up to the given end offset. The pages array is
1104 * assumed to be big enough to hold all bytes up to the end.
1106 static void zero_pages(struct page **pages, u64 offset, u64 end)
1108 struct page **page = &pages[offset >> PAGE_SHIFT];
1110 rbd_assert(end > offset);
1111 rbd_assert(end - offset <= (u64)SIZE_MAX);
1112 while (offset < end) {
1115 unsigned long flags;
1118 page_offset = (size_t)(offset & ~PAGE_MASK);
1119 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1120 local_irq_save(flags);
1121 kaddr = kmap_atomic(*page);
1122 memset(kaddr + page_offset, 0, length);
1123 kunmap_atomic(kaddr);
1124 local_irq_restore(flags);
1132 * Clone a portion of a bio, starting at the given byte offset
1133 * and continuing for the number of bytes indicated.
1135 static struct bio *bio_clone_range(struct bio *bio_src,
1136 unsigned int offset,
1144 unsigned short end_idx;
1145 unsigned short vcnt;
1148 /* Handle the easy case for the caller */
1150 if (!offset && len == bio_src->bi_size)
1151 return bio_clone(bio_src, gfpmask);
1153 if (WARN_ON_ONCE(!len))
1155 if (WARN_ON_ONCE(len > bio_src->bi_size))
1157 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1160 /* Find first affected segment... */
1163 __bio_for_each_segment(bv, bio_src, idx, 0) {
1164 if (resid < bv->bv_len)
1166 resid -= bv->bv_len;
1170 /* ...and the last affected segment */
1173 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1174 if (resid <= bv->bv_len)
1176 resid -= bv->bv_len;
1178 vcnt = end_idx - idx + 1;
1180 /* Build the clone */
1182 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1184 return NULL; /* ENOMEM */
1186 bio->bi_bdev = bio_src->bi_bdev;
1187 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1188 bio->bi_rw = bio_src->bi_rw;
1189 bio->bi_flags |= 1 << BIO_CLONED;
1192 * Copy over our part of the bio_vec, then update the first
1193 * and last (or only) entries.
1195 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1196 vcnt * sizeof (struct bio_vec));
1197 bio->bi_io_vec[0].bv_offset += voff;
1199 bio->bi_io_vec[0].bv_len -= voff;
1200 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1202 bio->bi_io_vec[0].bv_len = len;
1205 bio->bi_vcnt = vcnt;
1213 * Clone a portion of a bio chain, starting at the given byte offset
1214 * into the first bio in the source chain and continuing for the
1215 * number of bytes indicated. The result is another bio chain of
1216 * exactly the given length, or a null pointer on error.
1218 * The bio_src and offset parameters are both in-out. On entry they
1219 * refer to the first source bio and the offset into that bio where
1220 * the start of data to be cloned is located.
1222 * On return, bio_src is updated to refer to the bio in the source
1223 * chain that contains first un-cloned byte, and *offset will
1224 * contain the offset of that byte within that bio.
1226 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1227 unsigned int *offset,
1231 struct bio *bi = *bio_src;
1232 unsigned int off = *offset;
1233 struct bio *chain = NULL;
1236 /* Build up a chain of clone bios up to the limit */
1238 if (!bi || off >= bi->bi_size || !len)
1239 return NULL; /* Nothing to clone */
1243 unsigned int bi_size;
1247 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1248 goto out_err; /* EINVAL; ran out of bio's */
1250 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1251 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1253 goto out_err; /* ENOMEM */
1256 end = &bio->bi_next;
1259 if (off == bi->bi_size) {
1270 bio_chain_put(chain);
1276 * The default/initial value for all object request flags is 0. For
1277 * each flag, once its value is set to 1 it is never reset to 0
1280 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1282 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1283 struct rbd_device *rbd_dev;
1285 rbd_dev = obj_request->img_request->rbd_dev;
1286 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1291 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1294 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1297 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1299 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1300 struct rbd_device *rbd_dev = NULL;
1302 if (obj_request_img_data_test(obj_request))
1303 rbd_dev = obj_request->img_request->rbd_dev;
1304 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1309 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1312 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1316 * This sets the KNOWN flag after (possibly) setting the EXISTS
1317 * flag. The latter is set based on the "exists" value provided.
1319 * Note that for our purposes once an object exists it never goes
1320 * away again. It's possible that the response from two existence
1321 * checks are separated by the creation of the target object, and
1322 * the first ("doesn't exist") response arrives *after* the second
1323 * ("does exist"). In that case we ignore the second one.
1325 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1329 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1330 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1334 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1337 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1340 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1343 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1346 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1348 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1349 atomic_read(&obj_request->kref.refcount));
1350 kref_get(&obj_request->kref);
1353 static void rbd_obj_request_destroy(struct kref *kref);
1354 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1356 rbd_assert(obj_request != NULL);
1357 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1358 atomic_read(&obj_request->kref.refcount));
1359 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1362 static void rbd_img_request_destroy(struct kref *kref);
1363 static void rbd_img_request_put(struct rbd_img_request *img_request)
1365 rbd_assert(img_request != NULL);
1366 dout("%s: img %p (was %d)\n", __func__, img_request,
1367 atomic_read(&img_request->kref.refcount));
1368 kref_put(&img_request->kref, rbd_img_request_destroy);
1371 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1372 struct rbd_obj_request *obj_request)
1374 rbd_assert(obj_request->img_request == NULL);
1376 /* Image request now owns object's original reference */
1377 obj_request->img_request = img_request;
1378 obj_request->which = img_request->obj_request_count;
1379 rbd_assert(!obj_request_img_data_test(obj_request));
1380 obj_request_img_data_set(obj_request);
1381 rbd_assert(obj_request->which != BAD_WHICH);
1382 img_request->obj_request_count++;
1383 list_add_tail(&obj_request->links, &img_request->obj_requests);
1384 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1385 obj_request->which);
1388 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1389 struct rbd_obj_request *obj_request)
1391 rbd_assert(obj_request->which != BAD_WHICH);
1393 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1394 obj_request->which);
1395 list_del(&obj_request->links);
1396 rbd_assert(img_request->obj_request_count > 0);
1397 img_request->obj_request_count--;
1398 rbd_assert(obj_request->which == img_request->obj_request_count);
1399 obj_request->which = BAD_WHICH;
1400 rbd_assert(obj_request_img_data_test(obj_request));
1401 rbd_assert(obj_request->img_request == img_request);
1402 obj_request->img_request = NULL;
1403 obj_request->callback = NULL;
1404 rbd_obj_request_put(obj_request);
1407 static bool obj_request_type_valid(enum obj_request_type type)
1410 case OBJ_REQUEST_NODATA:
1411 case OBJ_REQUEST_BIO:
1412 case OBJ_REQUEST_PAGES:
1419 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1420 struct rbd_obj_request *obj_request)
1422 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1424 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1427 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1430 dout("%s: img %p\n", __func__, img_request);
1433 * If no error occurred, compute the aggregate transfer
1434 * count for the image request. We could instead use
1435 * atomic64_cmpxchg() to update it as each object request
1436 * completes; not clear which way is better off hand.
1438 if (!img_request->result) {
1439 struct rbd_obj_request *obj_request;
1442 for_each_obj_request(img_request, obj_request)
1443 xferred += obj_request->xferred;
1444 img_request->xferred = xferred;
1447 if (img_request->callback)
1448 img_request->callback(img_request);
1450 rbd_img_request_put(img_request);
1453 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1455 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1457 dout("%s: obj %p\n", __func__, obj_request);
1459 return wait_for_completion_interruptible(&obj_request->completion);
1463 * The default/initial value for all image request flags is 0. Each
1464 * is conditionally set to 1 at image request initialization time
1465 * and currently never change thereafter.
1467 static void img_request_write_set(struct rbd_img_request *img_request)
1469 set_bit(IMG_REQ_WRITE, &img_request->flags);
1473 static bool img_request_write_test(struct rbd_img_request *img_request)
1476 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1479 static void img_request_child_set(struct rbd_img_request *img_request)
1481 set_bit(IMG_REQ_CHILD, &img_request->flags);
1485 static bool img_request_child_test(struct rbd_img_request *img_request)
1488 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1491 static void img_request_layered_set(struct rbd_img_request *img_request)
1493 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1497 static bool img_request_layered_test(struct rbd_img_request *img_request)
1500 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1504 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1506 u64 xferred = obj_request->xferred;
1507 u64 length = obj_request->length;
1509 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1510 obj_request, obj_request->img_request, obj_request->result,
1513 * ENOENT means a hole in the image. We zero-fill the
1514 * entire length of the request. A short read also implies
1515 * zero-fill to the end of the request. Either way we
1516 * update the xferred count to indicate the whole request
1519 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1520 if (obj_request->result == -ENOENT) {
1521 if (obj_request->type == OBJ_REQUEST_BIO)
1522 zero_bio_chain(obj_request->bio_list, 0);
1524 zero_pages(obj_request->pages, 0, length);
1525 obj_request->result = 0;
1526 obj_request->xferred = length;
1527 } else if (xferred < length && !obj_request->result) {
1528 if (obj_request->type == OBJ_REQUEST_BIO)
1529 zero_bio_chain(obj_request->bio_list, xferred);
1531 zero_pages(obj_request->pages, xferred, length);
1532 obj_request->xferred = length;
1534 obj_request_done_set(obj_request);
1537 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1539 dout("%s: obj %p cb %p\n", __func__, obj_request,
1540 obj_request->callback);
1541 if (obj_request->callback)
1542 obj_request->callback(obj_request);
1544 complete_all(&obj_request->completion);
1547 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1549 dout("%s: obj %p\n", __func__, obj_request);
1550 obj_request_done_set(obj_request);
1553 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1555 struct rbd_img_request *img_request = NULL;
1556 struct rbd_device *rbd_dev = NULL;
1557 bool layered = false;
1559 if (obj_request_img_data_test(obj_request)) {
1560 img_request = obj_request->img_request;
1561 layered = img_request && img_request_layered_test(img_request);
1562 rbd_dev = img_request->rbd_dev;
1565 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1566 obj_request, img_request, obj_request->result,
1567 obj_request->xferred, obj_request->length);
1568 if (layered && obj_request->result == -ENOENT &&
1569 obj_request->img_offset < rbd_dev->parent_overlap)
1570 rbd_img_parent_read(obj_request);
1571 else if (img_request)
1572 rbd_img_obj_request_read_callback(obj_request);
1574 obj_request_done_set(obj_request);
1577 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1579 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1580 obj_request->result, obj_request->length);
1582 * There is no such thing as a successful short write. Set
1583 * it to our originally-requested length.
1585 obj_request->xferred = obj_request->length;
1586 obj_request_done_set(obj_request);
1590 * For a simple stat call there's nothing to do. We'll do more if
1591 * this is part of a write sequence for a layered image.
1593 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1595 dout("%s: obj %p\n", __func__, obj_request);
1596 obj_request_done_set(obj_request);
1599 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1600 struct ceph_msg *msg)
1602 struct rbd_obj_request *obj_request = osd_req->r_priv;
1605 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1606 rbd_assert(osd_req == obj_request->osd_req);
1607 if (obj_request_img_data_test(obj_request)) {
1608 rbd_assert(obj_request->img_request);
1609 rbd_assert(obj_request->which != BAD_WHICH);
1611 rbd_assert(obj_request->which == BAD_WHICH);
1614 if (osd_req->r_result < 0)
1615 obj_request->result = osd_req->r_result;
1617 BUG_ON(osd_req->r_num_ops > 2);
1620 * We support a 64-bit length, but ultimately it has to be
1621 * passed to blk_end_request(), which takes an unsigned int.
1623 obj_request->xferred = osd_req->r_reply_op_len[0];
1624 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1625 opcode = osd_req->r_ops[0].op;
1627 case CEPH_OSD_OP_READ:
1628 rbd_osd_read_callback(obj_request);
1630 case CEPH_OSD_OP_WRITE:
1631 rbd_osd_write_callback(obj_request);
1633 case CEPH_OSD_OP_STAT:
1634 rbd_osd_stat_callback(obj_request);
1636 case CEPH_OSD_OP_CALL:
1637 case CEPH_OSD_OP_NOTIFY_ACK:
1638 case CEPH_OSD_OP_WATCH:
1639 rbd_osd_trivial_callback(obj_request);
1642 rbd_warn(NULL, "%s: unsupported op %hu\n",
1643 obj_request->object_name, (unsigned short) opcode);
1647 if (obj_request_done_test(obj_request))
1648 rbd_obj_request_complete(obj_request);
1651 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1653 struct rbd_img_request *img_request = obj_request->img_request;
1654 struct ceph_osd_request *osd_req = obj_request->osd_req;
1657 rbd_assert(osd_req != NULL);
1659 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1660 ceph_osdc_build_request(osd_req, obj_request->offset,
1661 NULL, snap_id, NULL);
1664 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1666 struct rbd_img_request *img_request = obj_request->img_request;
1667 struct ceph_osd_request *osd_req = obj_request->osd_req;
1668 struct ceph_snap_context *snapc;
1669 struct timespec mtime = CURRENT_TIME;
1671 rbd_assert(osd_req != NULL);
1673 snapc = img_request ? img_request->snapc : NULL;
1674 ceph_osdc_build_request(osd_req, obj_request->offset,
1675 snapc, CEPH_NOSNAP, &mtime);
1678 static struct ceph_osd_request *rbd_osd_req_create(
1679 struct rbd_device *rbd_dev,
1681 struct rbd_obj_request *obj_request)
1683 struct ceph_snap_context *snapc = NULL;
1684 struct ceph_osd_client *osdc;
1685 struct ceph_osd_request *osd_req;
1687 if (obj_request_img_data_test(obj_request)) {
1688 struct rbd_img_request *img_request = obj_request->img_request;
1690 rbd_assert(write_request ==
1691 img_request_write_test(img_request));
1693 snapc = img_request->snapc;
1696 /* Allocate and initialize the request, for the single op */
1698 osdc = &rbd_dev->rbd_client->client->osdc;
1699 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1701 return NULL; /* ENOMEM */
1704 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1706 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1708 osd_req->r_callback = rbd_osd_req_callback;
1709 osd_req->r_priv = obj_request;
1711 osd_req->r_oid_len = strlen(obj_request->object_name);
1712 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1713 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1715 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1721 * Create a copyup osd request based on the information in the
1722 * object request supplied. A copyup request has two osd ops,
1723 * a copyup method call, and a "normal" write request.
1725 static struct ceph_osd_request *
1726 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1728 struct rbd_img_request *img_request;
1729 struct ceph_snap_context *snapc;
1730 struct rbd_device *rbd_dev;
1731 struct ceph_osd_client *osdc;
1732 struct ceph_osd_request *osd_req;
1734 rbd_assert(obj_request_img_data_test(obj_request));
1735 img_request = obj_request->img_request;
1736 rbd_assert(img_request);
1737 rbd_assert(img_request_write_test(img_request));
1739 /* Allocate and initialize the request, for the two ops */
1741 snapc = img_request->snapc;
1742 rbd_dev = img_request->rbd_dev;
1743 osdc = &rbd_dev->rbd_client->client->osdc;
1744 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1746 return NULL; /* ENOMEM */
1748 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1749 osd_req->r_callback = rbd_osd_req_callback;
1750 osd_req->r_priv = obj_request;
1752 osd_req->r_oid_len = strlen(obj_request->object_name);
1753 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1754 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1756 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1762 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1764 ceph_osdc_put_request(osd_req);
1767 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1769 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1770 u64 offset, u64 length,
1771 enum obj_request_type type)
1773 struct rbd_obj_request *obj_request;
1777 rbd_assert(obj_request_type_valid(type));
1779 size = strlen(object_name) + 1;
1780 name = kmalloc(size, GFP_KERNEL);
1784 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1790 obj_request->object_name = memcpy(name, object_name, size);
1791 obj_request->offset = offset;
1792 obj_request->length = length;
1793 obj_request->flags = 0;
1794 obj_request->which = BAD_WHICH;
1795 obj_request->type = type;
1796 INIT_LIST_HEAD(&obj_request->links);
1797 init_completion(&obj_request->completion);
1798 kref_init(&obj_request->kref);
1800 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1801 offset, length, (int)type, obj_request);
1806 static void rbd_obj_request_destroy(struct kref *kref)
1808 struct rbd_obj_request *obj_request;
1810 obj_request = container_of(kref, struct rbd_obj_request, kref);
1812 dout("%s: obj %p\n", __func__, obj_request);
1814 rbd_assert(obj_request->img_request == NULL);
1815 rbd_assert(obj_request->which == BAD_WHICH);
1817 if (obj_request->osd_req)
1818 rbd_osd_req_destroy(obj_request->osd_req);
1820 rbd_assert(obj_request_type_valid(obj_request->type));
1821 switch (obj_request->type) {
1822 case OBJ_REQUEST_NODATA:
1823 break; /* Nothing to do */
1824 case OBJ_REQUEST_BIO:
1825 if (obj_request->bio_list)
1826 bio_chain_put(obj_request->bio_list);
1828 case OBJ_REQUEST_PAGES:
1829 if (obj_request->pages)
1830 ceph_release_page_vector(obj_request->pages,
1831 obj_request->page_count);
1835 kfree(obj_request->object_name);
1836 obj_request->object_name = NULL;
1837 kmem_cache_free(rbd_obj_request_cache, obj_request);
1841 * Caller is responsible for filling in the list of object requests
1842 * that comprises the image request, and the Linux request pointer
1843 * (if there is one).
1845 static struct rbd_img_request *rbd_img_request_create(
1846 struct rbd_device *rbd_dev,
1847 u64 offset, u64 length,
1851 struct rbd_img_request *img_request;
1853 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1857 if (write_request) {
1858 down_read(&rbd_dev->header_rwsem);
1859 ceph_get_snap_context(rbd_dev->header.snapc);
1860 up_read(&rbd_dev->header_rwsem);
1863 img_request->rq = NULL;
1864 img_request->rbd_dev = rbd_dev;
1865 img_request->offset = offset;
1866 img_request->length = length;
1867 img_request->flags = 0;
1868 if (write_request) {
1869 img_request_write_set(img_request);
1870 img_request->snapc = rbd_dev->header.snapc;
1872 img_request->snap_id = rbd_dev->spec->snap_id;
1875 img_request_child_set(img_request);
1876 if (rbd_dev->parent_overlap)
1877 img_request_layered_set(img_request);
1878 spin_lock_init(&img_request->completion_lock);
1879 img_request->next_completion = 0;
1880 img_request->callback = NULL;
1881 img_request->result = 0;
1882 img_request->obj_request_count = 0;
1883 INIT_LIST_HEAD(&img_request->obj_requests);
1884 kref_init(&img_request->kref);
1886 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1887 write_request ? "write" : "read", offset, length,
1893 static void rbd_img_request_destroy(struct kref *kref)
1895 struct rbd_img_request *img_request;
1896 struct rbd_obj_request *obj_request;
1897 struct rbd_obj_request *next_obj_request;
1899 img_request = container_of(kref, struct rbd_img_request, kref);
1901 dout("%s: img %p\n", __func__, img_request);
1903 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1904 rbd_img_obj_request_del(img_request, obj_request);
1905 rbd_assert(img_request->obj_request_count == 0);
1907 if (img_request_write_test(img_request))
1908 ceph_put_snap_context(img_request->snapc);
1910 if (img_request_child_test(img_request))
1911 rbd_obj_request_put(img_request->obj_request);
1913 kmem_cache_free(rbd_img_request_cache, img_request);
1916 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1918 struct rbd_img_request *img_request;
1919 unsigned int xferred;
1923 rbd_assert(obj_request_img_data_test(obj_request));
1924 img_request = obj_request->img_request;
1926 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1927 xferred = (unsigned int)obj_request->xferred;
1928 result = obj_request->result;
1930 struct rbd_device *rbd_dev = img_request->rbd_dev;
1932 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1933 img_request_write_test(img_request) ? "write" : "read",
1934 obj_request->length, obj_request->img_offset,
1935 obj_request->offset);
1936 rbd_warn(rbd_dev, " result %d xferred %x\n",
1938 if (!img_request->result)
1939 img_request->result = result;
1942 /* Image object requests don't own their page array */
1944 if (obj_request->type == OBJ_REQUEST_PAGES) {
1945 obj_request->pages = NULL;
1946 obj_request->page_count = 0;
1949 if (img_request_child_test(img_request)) {
1950 rbd_assert(img_request->obj_request != NULL);
1951 more = obj_request->which < img_request->obj_request_count - 1;
1953 rbd_assert(img_request->rq != NULL);
1954 more = blk_end_request(img_request->rq, result, xferred);
1960 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1962 struct rbd_img_request *img_request;
1963 u32 which = obj_request->which;
1966 rbd_assert(obj_request_img_data_test(obj_request));
1967 img_request = obj_request->img_request;
1969 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1970 rbd_assert(img_request != NULL);
1971 rbd_assert(img_request->obj_request_count > 0);
1972 rbd_assert(which != BAD_WHICH);
1973 rbd_assert(which < img_request->obj_request_count);
1974 rbd_assert(which >= img_request->next_completion);
1976 spin_lock_irq(&img_request->completion_lock);
1977 if (which != img_request->next_completion)
1980 for_each_obj_request_from(img_request, obj_request) {
1982 rbd_assert(which < img_request->obj_request_count);
1984 if (!obj_request_done_test(obj_request))
1986 more = rbd_img_obj_end_request(obj_request);
1990 rbd_assert(more ^ (which == img_request->obj_request_count));
1991 img_request->next_completion = which;
1993 spin_unlock_irq(&img_request->completion_lock);
1996 rbd_img_request_complete(img_request);
2000 * Split up an image request into one or more object requests, each
2001 * to a different object. The "type" parameter indicates whether
2002 * "data_desc" is the pointer to the head of a list of bio
2003 * structures, or the base of a page array. In either case this
2004 * function assumes data_desc describes memory sufficient to hold
2005 * all data described by the image request.
2007 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2008 enum obj_request_type type,
2011 struct rbd_device *rbd_dev = img_request->rbd_dev;
2012 struct rbd_obj_request *obj_request = NULL;
2013 struct rbd_obj_request *next_obj_request;
2014 bool write_request = img_request_write_test(img_request);
2015 struct bio *bio_list;
2016 unsigned int bio_offset = 0;
2017 struct page **pages;
2022 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2023 (int)type, data_desc);
2025 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2026 img_offset = img_request->offset;
2027 resid = img_request->length;
2028 rbd_assert(resid > 0);
2030 if (type == OBJ_REQUEST_BIO) {
2031 bio_list = data_desc;
2032 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2034 rbd_assert(type == OBJ_REQUEST_PAGES);
2039 struct ceph_osd_request *osd_req;
2040 const char *object_name;
2044 object_name = rbd_segment_name(rbd_dev, img_offset);
2047 offset = rbd_segment_offset(rbd_dev, img_offset);
2048 length = rbd_segment_length(rbd_dev, img_offset, resid);
2049 obj_request = rbd_obj_request_create(object_name,
2050 offset, length, type);
2051 /* object request has its own copy of the object name */
2052 rbd_segment_name_free(object_name);
2056 if (type == OBJ_REQUEST_BIO) {
2057 unsigned int clone_size;
2059 rbd_assert(length <= (u64)UINT_MAX);
2060 clone_size = (unsigned int)length;
2061 obj_request->bio_list =
2062 bio_chain_clone_range(&bio_list,
2066 if (!obj_request->bio_list)
2069 unsigned int page_count;
2071 obj_request->pages = pages;
2072 page_count = (u32)calc_pages_for(offset, length);
2073 obj_request->page_count = page_count;
2074 if ((offset + length) & ~PAGE_MASK)
2075 page_count--; /* more on last page */
2076 pages += page_count;
2079 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2083 obj_request->osd_req = osd_req;
2084 obj_request->callback = rbd_img_obj_callback;
2086 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2088 if (type == OBJ_REQUEST_BIO)
2089 osd_req_op_extent_osd_data_bio(osd_req, 0,
2090 obj_request->bio_list, length);
2092 osd_req_op_extent_osd_data_pages(osd_req, 0,
2093 obj_request->pages, length,
2094 offset & ~PAGE_MASK, false, false);
2097 rbd_osd_req_format_write(obj_request);
2099 rbd_osd_req_format_read(obj_request);
2101 obj_request->img_offset = img_offset;
2102 rbd_img_obj_request_add(img_request, obj_request);
2104 img_offset += length;
2111 rbd_obj_request_put(obj_request);
2113 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2114 rbd_obj_request_put(obj_request);
2120 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2122 struct rbd_img_request *img_request;
2123 struct rbd_device *rbd_dev;
2124 struct page **pages;
2127 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128 rbd_assert(obj_request_img_data_test(obj_request));
2129 img_request = obj_request->img_request;
2130 rbd_assert(img_request);
2132 rbd_dev = img_request->rbd_dev;
2133 rbd_assert(rbd_dev);
2135 pages = obj_request->copyup_pages;
2136 rbd_assert(pages != NULL);
2137 obj_request->copyup_pages = NULL;
2138 page_count = obj_request->copyup_page_count;
2139 rbd_assert(page_count);
2140 obj_request->copyup_page_count = 0;
2141 ceph_release_page_vector(pages, page_count);
2144 * We want the transfer count to reflect the size of the
2145 * original write request. There is no such thing as a
2146 * successful short write, so if the request was successful
2147 * we can just set it to the originally-requested length.
2149 if (!obj_request->result)
2150 obj_request->xferred = obj_request->length;
2152 /* Finish up with the normal image object callback */
2154 rbd_img_obj_callback(obj_request);
2158 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2160 struct rbd_obj_request *orig_request;
2161 struct ceph_osd_request *osd_req;
2162 struct ceph_osd_client *osdc;
2163 struct rbd_device *rbd_dev;
2164 struct page **pages;
2171 rbd_assert(img_request_child_test(img_request));
2173 /* First get what we need from the image request */
2175 pages = img_request->copyup_pages;
2176 rbd_assert(pages != NULL);
2177 img_request->copyup_pages = NULL;
2178 page_count = img_request->copyup_page_count;
2179 rbd_assert(page_count);
2180 img_request->copyup_page_count = 0;
2182 orig_request = img_request->obj_request;
2183 rbd_assert(orig_request != NULL);
2184 rbd_assert(obj_request_type_valid(orig_request->type));
2185 result = img_request->result;
2186 parent_length = img_request->length;
2187 rbd_assert(parent_length == img_request->xferred);
2188 rbd_img_request_put(img_request);
2190 rbd_assert(orig_request->img_request);
2191 rbd_dev = orig_request->img_request->rbd_dev;
2192 rbd_assert(rbd_dev);
2198 * The original osd request is of no use to use any more.
2199 * We need a new one that can hold the two ops in a copyup
2200 * request. Allocate the new copyup osd request for the
2201 * original request, and release the old one.
2204 osd_req = rbd_osd_req_create_copyup(orig_request);
2207 rbd_osd_req_destroy(orig_request->osd_req);
2208 orig_request->osd_req = osd_req;
2209 orig_request->copyup_pages = pages;
2210 orig_request->copyup_page_count = page_count;
2212 /* Initialize the copyup op */
2214 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2215 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2218 /* Then the original write request op */
2220 offset = orig_request->offset;
2221 length = orig_request->length;
2222 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2223 offset, length, 0, 0);
2224 if (orig_request->type == OBJ_REQUEST_BIO)
2225 osd_req_op_extent_osd_data_bio(osd_req, 1,
2226 orig_request->bio_list, length);
2228 osd_req_op_extent_osd_data_pages(osd_req, 1,
2229 orig_request->pages, length,
2230 offset & ~PAGE_MASK, false, false);
2232 rbd_osd_req_format_write(orig_request);
2234 /* All set, send it off. */
2236 orig_request->callback = rbd_img_obj_copyup_callback;
2237 osdc = &rbd_dev->rbd_client->client->osdc;
2238 result = rbd_obj_request_submit(osdc, orig_request);
2242 /* Record the error code and complete the request */
2244 orig_request->result = result;
2245 orig_request->xferred = 0;
2246 obj_request_done_set(orig_request);
2247 rbd_obj_request_complete(orig_request);
2251 * Read from the parent image the range of data that covers the
2252 * entire target of the given object request. This is used for
2253 * satisfying a layered image write request when the target of an
2254 * object request from the image request does not exist.
2256 * A page array big enough to hold the returned data is allocated
2257 * and supplied to rbd_img_request_fill() as the "data descriptor."
2258 * When the read completes, this page array will be transferred to
2259 * the original object request for the copyup operation.
2261 * If an error occurs, record it as the result of the original
2262 * object request and mark it done so it gets completed.
2264 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2266 struct rbd_img_request *img_request = NULL;
2267 struct rbd_img_request *parent_request = NULL;
2268 struct rbd_device *rbd_dev;
2271 struct page **pages = NULL;
2275 rbd_assert(obj_request_img_data_test(obj_request));
2276 rbd_assert(obj_request_type_valid(obj_request->type));
2278 img_request = obj_request->img_request;
2279 rbd_assert(img_request != NULL);
2280 rbd_dev = img_request->rbd_dev;
2281 rbd_assert(rbd_dev->parent != NULL);
2284 * Determine the byte range covered by the object in the
2285 * child image to which the original request was to be sent.
2287 img_offset = obj_request->img_offset - obj_request->offset;
2288 length = (u64)1 << rbd_dev->header.obj_order;
2291 * There is no defined parent data beyond the parent
2292 * overlap, so limit what we read at that boundary if
2295 if (img_offset + length > rbd_dev->parent_overlap) {
2296 rbd_assert(img_offset < rbd_dev->parent_overlap);
2297 length = rbd_dev->parent_overlap - img_offset;
2301 * Allocate a page array big enough to receive the data read
2304 page_count = (u32)calc_pages_for(0, length);
2305 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2306 if (IS_ERR(pages)) {
2307 result = PTR_ERR(pages);
2313 parent_request = rbd_img_request_create(rbd_dev->parent,
2316 if (!parent_request)
2318 rbd_obj_request_get(obj_request);
2319 parent_request->obj_request = obj_request;
2321 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2324 parent_request->copyup_pages = pages;
2325 parent_request->copyup_page_count = page_count;
2327 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2328 result = rbd_img_request_submit(parent_request);
2332 parent_request->copyup_pages = NULL;
2333 parent_request->copyup_page_count = 0;
2334 parent_request->obj_request = NULL;
2335 rbd_obj_request_put(obj_request);
2338 ceph_release_page_vector(pages, page_count);
2340 rbd_img_request_put(parent_request);
2341 obj_request->result = result;
2342 obj_request->xferred = 0;
2343 obj_request_done_set(obj_request);
2348 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2350 struct rbd_obj_request *orig_request;
2353 rbd_assert(!obj_request_img_data_test(obj_request));
2356 * All we need from the object request is the original
2357 * request and the result of the STAT op. Grab those, then
2358 * we're done with the request.
2360 orig_request = obj_request->obj_request;
2361 obj_request->obj_request = NULL;
2362 rbd_assert(orig_request);
2363 rbd_assert(orig_request->img_request);
2365 result = obj_request->result;
2366 obj_request->result = 0;
2368 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2369 obj_request, orig_request, result,
2370 obj_request->xferred, obj_request->length);
2371 rbd_obj_request_put(obj_request);
2373 rbd_assert(orig_request);
2374 rbd_assert(orig_request->img_request);
2377 * Our only purpose here is to determine whether the object
2378 * exists, and we don't want to treat the non-existence as
2379 * an error. If something else comes back, transfer the
2380 * error to the original request and complete it now.
2383 obj_request_existence_set(orig_request, true);
2384 } else if (result == -ENOENT) {
2385 obj_request_existence_set(orig_request, false);
2386 } else if (result) {
2387 orig_request->result = result;
2392 * Resubmit the original request now that we have recorded
2393 * whether the target object exists.
2395 orig_request->result = rbd_img_obj_request_submit(orig_request);
2397 if (orig_request->result)
2398 rbd_obj_request_complete(orig_request);
2399 rbd_obj_request_put(orig_request);
2402 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2404 struct rbd_obj_request *stat_request;
2405 struct rbd_device *rbd_dev;
2406 struct ceph_osd_client *osdc;
2407 struct page **pages = NULL;
2413 * The response data for a STAT call consists of:
2420 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2421 page_count = (u32)calc_pages_for(0, size);
2422 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2424 return PTR_ERR(pages);
2427 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2432 rbd_obj_request_get(obj_request);
2433 stat_request->obj_request = obj_request;
2434 stat_request->pages = pages;
2435 stat_request->page_count = page_count;
2437 rbd_assert(obj_request->img_request);
2438 rbd_dev = obj_request->img_request->rbd_dev;
2439 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2441 if (!stat_request->osd_req)
2443 stat_request->callback = rbd_img_obj_exists_callback;
2445 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2446 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2448 rbd_osd_req_format_read(stat_request);
2450 osdc = &rbd_dev->rbd_client->client->osdc;
2451 ret = rbd_obj_request_submit(osdc, stat_request);
2454 rbd_obj_request_put(obj_request);
2459 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2461 struct rbd_img_request *img_request;
2462 struct rbd_device *rbd_dev;
2465 rbd_assert(obj_request_img_data_test(obj_request));
2467 img_request = obj_request->img_request;
2468 rbd_assert(img_request);
2469 rbd_dev = img_request->rbd_dev;
2472 * Only writes to layered images need special handling.
2473 * Reads and non-layered writes are simple object requests.
2474 * Layered writes that start beyond the end of the overlap
2475 * with the parent have no parent data, so they too are
2476 * simple object requests. Finally, if the target object is
2477 * known to already exist, its parent data has already been
2478 * copied, so a write to the object can also be handled as a
2479 * simple object request.
2481 if (!img_request_write_test(img_request) ||
2482 !img_request_layered_test(img_request) ||
2483 rbd_dev->parent_overlap <= obj_request->img_offset ||
2484 ((known = obj_request_known_test(obj_request)) &&
2485 obj_request_exists_test(obj_request))) {
2487 struct rbd_device *rbd_dev;
2488 struct ceph_osd_client *osdc;
2490 rbd_dev = obj_request->img_request->rbd_dev;
2491 osdc = &rbd_dev->rbd_client->client->osdc;
2493 return rbd_obj_request_submit(osdc, obj_request);
2497 * It's a layered write. The target object might exist but
2498 * we may not know that yet. If we know it doesn't exist,
2499 * start by reading the data for the full target object from
2500 * the parent so we can use it for a copyup to the target.
2503 return rbd_img_obj_parent_read_full(obj_request);
2505 /* We don't know whether the target exists. Go find out. */
2507 return rbd_img_obj_exists_submit(obj_request);
2510 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2512 struct rbd_obj_request *obj_request;
2513 struct rbd_obj_request *next_obj_request;
2515 dout("%s: img %p\n", __func__, img_request);
2516 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2519 ret = rbd_img_obj_request_submit(obj_request);
2527 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2529 struct rbd_obj_request *obj_request;
2530 struct rbd_device *rbd_dev;
2533 rbd_assert(img_request_child_test(img_request));
2535 obj_request = img_request->obj_request;
2536 rbd_assert(obj_request);
2537 rbd_assert(obj_request->img_request);
2539 obj_request->result = img_request->result;
2540 if (obj_request->result)
2544 * We need to zero anything beyond the parent overlap
2545 * boundary. Since rbd_img_obj_request_read_callback()
2546 * will zero anything beyond the end of a short read, an
2547 * easy way to do this is to pretend the data from the
2548 * parent came up short--ending at the overlap boundary.
2550 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2551 obj_end = obj_request->img_offset + obj_request->length;
2552 rbd_dev = obj_request->img_request->rbd_dev;
2553 if (obj_end > rbd_dev->parent_overlap) {
2556 if (obj_request->img_offset < rbd_dev->parent_overlap)
2557 xferred = rbd_dev->parent_overlap -
2558 obj_request->img_offset;
2560 obj_request->xferred = min(img_request->xferred, xferred);
2562 obj_request->xferred = img_request->xferred;
2565 rbd_img_request_put(img_request);
2566 rbd_img_obj_request_read_callback(obj_request);
2567 rbd_obj_request_complete(obj_request);
2570 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2572 struct rbd_device *rbd_dev;
2573 struct rbd_img_request *img_request;
2576 rbd_assert(obj_request_img_data_test(obj_request));
2577 rbd_assert(obj_request->img_request != NULL);
2578 rbd_assert(obj_request->result == (s32) -ENOENT);
2579 rbd_assert(obj_request_type_valid(obj_request->type));
2581 rbd_dev = obj_request->img_request->rbd_dev;
2582 rbd_assert(rbd_dev->parent != NULL);
2583 /* rbd_read_finish(obj_request, obj_request->length); */
2584 img_request = rbd_img_request_create(rbd_dev->parent,
2585 obj_request->img_offset,
2586 obj_request->length,
2592 rbd_obj_request_get(obj_request);
2593 img_request->obj_request = obj_request;
2595 if (obj_request->type == OBJ_REQUEST_BIO)
2596 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2597 obj_request->bio_list);
2599 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2600 obj_request->pages);
2604 img_request->callback = rbd_img_parent_read_callback;
2605 result = rbd_img_request_submit(img_request);
2612 rbd_img_request_put(img_request);
2613 obj_request->result = result;
2614 obj_request->xferred = 0;
2615 obj_request_done_set(obj_request);
2618 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2620 struct rbd_obj_request *obj_request;
2621 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2624 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2625 OBJ_REQUEST_NODATA);
2630 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2631 if (!obj_request->osd_req)
2633 obj_request->callback = rbd_obj_request_put;
2635 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2637 rbd_osd_req_format_read(obj_request);
2639 ret = rbd_obj_request_submit(osdc, obj_request);
2642 rbd_obj_request_put(obj_request);
2647 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2649 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2655 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2656 rbd_dev->header_name, (unsigned long long)notify_id,
2657 (unsigned int)opcode);
2658 ret = rbd_dev_refresh(rbd_dev);
2660 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2662 rbd_obj_notify_ack(rbd_dev, notify_id);
2666 * Request sync osd watch/unwatch. The value of "start" determines
2667 * whether a watch request is being initiated or torn down.
2669 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2671 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2672 struct rbd_obj_request *obj_request;
2675 rbd_assert(start ^ !!rbd_dev->watch_event);
2676 rbd_assert(start ^ !!rbd_dev->watch_request);
2679 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2680 &rbd_dev->watch_event);
2683 rbd_assert(rbd_dev->watch_event != NULL);
2687 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2688 OBJ_REQUEST_NODATA);
2692 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2693 if (!obj_request->osd_req)
2697 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2699 ceph_osdc_unregister_linger_request(osdc,
2700 rbd_dev->watch_request->osd_req);
2702 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2703 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2704 rbd_osd_req_format_write(obj_request);
2706 ret = rbd_obj_request_submit(osdc, obj_request);
2709 ret = rbd_obj_request_wait(obj_request);
2712 ret = obj_request->result;
2717 * A watch request is set to linger, so the underlying osd
2718 * request won't go away until we unregister it. We retain
2719 * a pointer to the object request during that time (in
2720 * rbd_dev->watch_request), so we'll keep a reference to
2721 * it. We'll drop that reference (below) after we've
2725 rbd_dev->watch_request = obj_request;
2730 /* We have successfully torn down the watch request */
2732 rbd_obj_request_put(rbd_dev->watch_request);
2733 rbd_dev->watch_request = NULL;
2735 /* Cancel the event if we're tearing down, or on error */
2736 ceph_osdc_cancel_event(rbd_dev->watch_event);
2737 rbd_dev->watch_event = NULL;
2739 rbd_obj_request_put(obj_request);
2745 * Synchronous osd object method call. Returns the number of bytes
2746 * returned in the outbound buffer, or a negative error code.
2748 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2749 const char *object_name,
2750 const char *class_name,
2751 const char *method_name,
2752 const void *outbound,
2753 size_t outbound_size,
2755 size_t inbound_size)
2757 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2758 struct rbd_obj_request *obj_request;
2759 struct page **pages;
2764 * Method calls are ultimately read operations. The result
2765 * should placed into the inbound buffer provided. They
2766 * also supply outbound data--parameters for the object
2767 * method. Currently if this is present it will be a
2770 page_count = (u32)calc_pages_for(0, inbound_size);
2771 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2773 return PTR_ERR(pages);
2776 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2781 obj_request->pages = pages;
2782 obj_request->page_count = page_count;
2784 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2785 if (!obj_request->osd_req)
2788 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2789 class_name, method_name);
2790 if (outbound_size) {
2791 struct ceph_pagelist *pagelist;
2793 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2797 ceph_pagelist_init(pagelist);
2798 ceph_pagelist_append(pagelist, outbound, outbound_size);
2799 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2802 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2803 obj_request->pages, inbound_size,
2805 rbd_osd_req_format_read(obj_request);
2807 ret = rbd_obj_request_submit(osdc, obj_request);
2810 ret = rbd_obj_request_wait(obj_request);
2814 ret = obj_request->result;
2818 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2819 ret = (int)obj_request->xferred;
2820 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2823 rbd_obj_request_put(obj_request);
2825 ceph_release_page_vector(pages, page_count);
2830 static void rbd_request_fn(struct request_queue *q)
2831 __releases(q->queue_lock) __acquires(q->queue_lock)
2833 struct rbd_device *rbd_dev = q->queuedata;
2834 bool read_only = rbd_dev->mapping.read_only;
2838 while ((rq = blk_fetch_request(q))) {
2839 bool write_request = rq_data_dir(rq) == WRITE;
2840 struct rbd_img_request *img_request;
2844 /* Ignore any non-FS requests that filter through. */
2846 if (rq->cmd_type != REQ_TYPE_FS) {
2847 dout("%s: non-fs request type %d\n", __func__,
2848 (int) rq->cmd_type);
2849 __blk_end_request_all(rq, 0);
2853 /* Ignore/skip any zero-length requests */
2855 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2856 length = (u64) blk_rq_bytes(rq);
2859 dout("%s: zero-length request\n", __func__);
2860 __blk_end_request_all(rq, 0);
2864 spin_unlock_irq(q->queue_lock);
2866 /* Disallow writes to a read-only device */
2868 if (write_request) {
2872 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2876 * Quit early if the mapped snapshot no longer
2877 * exists. It's still possible the snapshot will
2878 * have disappeared by the time our request arrives
2879 * at the osd, but there's no sense in sending it if
2882 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2883 dout("request for non-existent snapshot");
2884 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2890 if (offset && length > U64_MAX - offset + 1) {
2891 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2893 goto end_request; /* Shouldn't happen */
2897 if (offset + length > rbd_dev->mapping.size) {
2898 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2899 offset, length, rbd_dev->mapping.size);
2904 img_request = rbd_img_request_create(rbd_dev, offset, length,
2905 write_request, false);
2909 img_request->rq = rq;
2911 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2914 result = rbd_img_request_submit(img_request);
2916 rbd_img_request_put(img_request);
2918 spin_lock_irq(q->queue_lock);
2920 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2921 write_request ? "write" : "read",
2922 length, offset, result);
2924 __blk_end_request_all(rq, result);
2930 * a queue callback. Makes sure that we don't create a bio that spans across
2931 * multiple osd objects. One exception would be with a single page bios,
2932 * which we handle later at bio_chain_clone_range()
2934 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2935 struct bio_vec *bvec)
2937 struct rbd_device *rbd_dev = q->queuedata;
2938 sector_t sector_offset;
2939 sector_t sectors_per_obj;
2940 sector_t obj_sector_offset;
2944 * Find how far into its rbd object the partition-relative
2945 * bio start sector is to offset relative to the enclosing
2948 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2949 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2950 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2953 * Compute the number of bytes from that offset to the end
2954 * of the object. Account for what's already used by the bio.
2956 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2957 if (ret > bmd->bi_size)
2958 ret -= bmd->bi_size;
2963 * Don't send back more than was asked for. And if the bio
2964 * was empty, let the whole thing through because: "Note
2965 * that a block device *must* allow a single page to be
2966 * added to an empty bio."
2968 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2969 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2970 ret = (int) bvec->bv_len;
2975 static void rbd_free_disk(struct rbd_device *rbd_dev)
2977 struct gendisk *disk = rbd_dev->disk;
2982 rbd_dev->disk = NULL;
2983 if (disk->flags & GENHD_FL_UP) {
2986 blk_cleanup_queue(disk->queue);
2991 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2992 const char *object_name,
2993 u64 offset, u64 length, void *buf)
2996 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2997 struct rbd_obj_request *obj_request;
2998 struct page **pages = NULL;
3003 page_count = (u32) calc_pages_for(offset, length);
3004 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3006 ret = PTR_ERR(pages);
3009 obj_request = rbd_obj_request_create(object_name, offset, length,
3014 obj_request->pages = pages;
3015 obj_request->page_count = page_count;
3017 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3018 if (!obj_request->osd_req)
3021 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3022 offset, length, 0, 0);
3023 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3025 obj_request->length,
3026 obj_request->offset & ~PAGE_MASK,
3028 rbd_osd_req_format_read(obj_request);
3030 ret = rbd_obj_request_submit(osdc, obj_request);
3033 ret = rbd_obj_request_wait(obj_request);
3037 ret = obj_request->result;
3041 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3042 size = (size_t) obj_request->xferred;
3043 ceph_copy_from_page_vector(pages, buf, 0, size);
3044 rbd_assert(size <= (size_t)INT_MAX);
3048 rbd_obj_request_put(obj_request);
3050 ceph_release_page_vector(pages, page_count);
3056 * Read the complete header for the given rbd device. On successful
3057 * return, the rbd_dev->header field will contain up-to-date
3058 * information about the image.
3060 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3062 struct rbd_image_header_ondisk *ondisk = NULL;
3069 * The complete header will include an array of its 64-bit
3070 * snapshot ids, followed by the names of those snapshots as
3071 * a contiguous block of NUL-terminated strings. Note that
3072 * the number of snapshots could change by the time we read
3073 * it in, in which case we re-read it.
3080 size = sizeof (*ondisk);
3081 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3083 ondisk = kmalloc(size, GFP_KERNEL);
3087 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3091 if ((size_t)ret < size) {
3093 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3097 if (!rbd_dev_ondisk_valid(ondisk)) {
3099 rbd_warn(rbd_dev, "invalid header");
3103 names_size = le64_to_cpu(ondisk->snap_names_len);
3104 want_count = snap_count;
3105 snap_count = le32_to_cpu(ondisk->snap_count);
3106 } while (snap_count != want_count);
3108 ret = rbd_header_from_disk(rbd_dev, ondisk);
3116 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3117 * has disappeared from the (just updated) snapshot context.
3119 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3123 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3126 snap_id = rbd_dev->spec->snap_id;
3127 if (snap_id == CEPH_NOSNAP)
3130 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3131 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3134 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3139 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3140 mapping_size = rbd_dev->mapping.size;
3141 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3142 if (rbd_dev->image_format == 1)
3143 ret = rbd_dev_v1_header_info(rbd_dev);
3145 ret = rbd_dev_v2_header_info(rbd_dev);
3147 /* If it's a mapped snapshot, validate its EXISTS flag */
3149 rbd_exists_validate(rbd_dev);
3150 mutex_unlock(&ctl_mutex);
3151 if (mapping_size != rbd_dev->mapping.size) {
3154 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3155 dout("setting size to %llu sectors", (unsigned long long)size);
3156 set_capacity(rbd_dev->disk, size);
3157 revalidate_disk(rbd_dev->disk);
3163 static int rbd_init_disk(struct rbd_device *rbd_dev)
3165 struct gendisk *disk;
3166 struct request_queue *q;
3169 /* create gendisk info */
3170 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3174 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3176 disk->major = rbd_dev->major;
3177 disk->first_minor = 0;
3178 disk->fops = &rbd_bd_ops;
3179 disk->private_data = rbd_dev;
3181 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3185 /* We use the default size, but let's be explicit about it. */
3186 blk_queue_physical_block_size(q, SECTOR_SIZE);
3188 /* set io sizes to object size */
3189 segment_size = rbd_obj_bytes(&rbd_dev->header);
3190 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3191 blk_queue_max_segment_size(q, segment_size);
3192 blk_queue_io_min(q, segment_size);
3193 blk_queue_io_opt(q, segment_size);
3195 blk_queue_merge_bvec(q, rbd_merge_bvec);
3198 q->queuedata = rbd_dev;
3200 rbd_dev->disk = disk;
3213 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3215 return container_of(dev, struct rbd_device, dev);
3218 static ssize_t rbd_size_show(struct device *dev,
3219 struct device_attribute *attr, char *buf)
3221 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3223 return sprintf(buf, "%llu\n",
3224 (unsigned long long)rbd_dev->mapping.size);
3228 * Note this shows the features for whatever's mapped, which is not
3229 * necessarily the base image.
3231 static ssize_t rbd_features_show(struct device *dev,
3232 struct device_attribute *attr, char *buf)
3234 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3236 return sprintf(buf, "0x%016llx\n",
3237 (unsigned long long)rbd_dev->mapping.features);
3240 static ssize_t rbd_major_show(struct device *dev,
3241 struct device_attribute *attr, char *buf)
3243 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3246 return sprintf(buf, "%d\n", rbd_dev->major);
3248 return sprintf(buf, "(none)\n");
3252 static ssize_t rbd_client_id_show(struct device *dev,
3253 struct device_attribute *attr, char *buf)
3255 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3257 return sprintf(buf, "client%lld\n",
3258 ceph_client_id(rbd_dev->rbd_client->client));
3261 static ssize_t rbd_pool_show(struct device *dev,
3262 struct device_attribute *attr, char *buf)
3264 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3266 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3269 static ssize_t rbd_pool_id_show(struct device *dev,
3270 struct device_attribute *attr, char *buf)
3272 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3274 return sprintf(buf, "%llu\n",
3275 (unsigned long long) rbd_dev->spec->pool_id);
3278 static ssize_t rbd_name_show(struct device *dev,
3279 struct device_attribute *attr, char *buf)
3281 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3283 if (rbd_dev->spec->image_name)
3284 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3286 return sprintf(buf, "(unknown)\n");
3289 static ssize_t rbd_image_id_show(struct device *dev,
3290 struct device_attribute *attr, char *buf)
3292 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3294 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3298 * Shows the name of the currently-mapped snapshot (or
3299 * RBD_SNAP_HEAD_NAME for the base image).
3301 static ssize_t rbd_snap_show(struct device *dev,
3302 struct device_attribute *attr,
3305 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3307 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3311 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3312 * for the parent image. If there is no parent, simply shows
3313 * "(no parent image)".
3315 static ssize_t rbd_parent_show(struct device *dev,
3316 struct device_attribute *attr,
3319 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3320 struct rbd_spec *spec = rbd_dev->parent_spec;
3325 return sprintf(buf, "(no parent image)\n");
3327 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3328 (unsigned long long) spec->pool_id, spec->pool_name);
3333 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3334 spec->image_name ? spec->image_name : "(unknown)");
3339 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3340 (unsigned long long) spec->snap_id, spec->snap_name);
3345 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3350 return (ssize_t) (bufp - buf);
3353 static ssize_t rbd_image_refresh(struct device *dev,
3354 struct device_attribute *attr,
3358 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3361 ret = rbd_dev_refresh(rbd_dev);
3363 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3365 return ret < 0 ? ret : size;
3368 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3369 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3370 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3371 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3372 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3373 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3374 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3375 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3376 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3377 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3378 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3380 static struct attribute *rbd_attrs[] = {
3381 &dev_attr_size.attr,
3382 &dev_attr_features.attr,
3383 &dev_attr_major.attr,
3384 &dev_attr_client_id.attr,
3385 &dev_attr_pool.attr,
3386 &dev_attr_pool_id.attr,
3387 &dev_attr_name.attr,
3388 &dev_attr_image_id.attr,
3389 &dev_attr_current_snap.attr,
3390 &dev_attr_parent.attr,
3391 &dev_attr_refresh.attr,
3395 static struct attribute_group rbd_attr_group = {
3399 static const struct attribute_group *rbd_attr_groups[] = {
3404 static void rbd_sysfs_dev_release(struct device *dev)
3408 static struct device_type rbd_device_type = {
3410 .groups = rbd_attr_groups,
3411 .release = rbd_sysfs_dev_release,
3414 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3416 kref_get(&spec->kref);
3421 static void rbd_spec_free(struct kref *kref);
3422 static void rbd_spec_put(struct rbd_spec *spec)
3425 kref_put(&spec->kref, rbd_spec_free);
3428 static struct rbd_spec *rbd_spec_alloc(void)
3430 struct rbd_spec *spec;
3432 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3435 kref_init(&spec->kref);
3440 static void rbd_spec_free(struct kref *kref)
3442 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3444 kfree(spec->pool_name);
3445 kfree(spec->image_id);
3446 kfree(spec->image_name);
3447 kfree(spec->snap_name);
3451 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3452 struct rbd_spec *spec)
3454 struct rbd_device *rbd_dev;
3456 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3460 spin_lock_init(&rbd_dev->lock);
3462 INIT_LIST_HEAD(&rbd_dev->node);
3463 init_rwsem(&rbd_dev->header_rwsem);
3465 rbd_dev->spec = spec;
3466 rbd_dev->rbd_client = rbdc;
3468 /* Initialize the layout used for all rbd requests */
3470 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3471 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3472 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3473 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3478 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3480 rbd_put_client(rbd_dev->rbd_client);
3481 rbd_spec_put(rbd_dev->spec);
3486 * Get the size and object order for an image snapshot, or if
3487 * snap_id is CEPH_NOSNAP, gets this information for the base
3490 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3491 u8 *order, u64 *snap_size)
3493 __le64 snapid = cpu_to_le64(snap_id);
3498 } __attribute__ ((packed)) size_buf = { 0 };
3500 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3502 &snapid, sizeof (snapid),
3503 &size_buf, sizeof (size_buf));
3504 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3507 if (ret < sizeof (size_buf))
3511 *order = size_buf.order;
3512 *snap_size = le64_to_cpu(size_buf.size);
3514 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3515 (unsigned long long)snap_id, (unsigned int)*order,
3516 (unsigned long long)*snap_size);
3521 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3523 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3524 &rbd_dev->header.obj_order,
3525 &rbd_dev->header.image_size);
3528 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3534 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3538 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3539 "rbd", "get_object_prefix", NULL, 0,
3540 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3541 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3546 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3547 p + ret, NULL, GFP_NOIO);
3550 if (IS_ERR(rbd_dev->header.object_prefix)) {
3551 ret = PTR_ERR(rbd_dev->header.object_prefix);
3552 rbd_dev->header.object_prefix = NULL;
3554 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3562 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3565 __le64 snapid = cpu_to_le64(snap_id);
3569 } __attribute__ ((packed)) features_buf = { 0 };
3573 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3574 "rbd", "get_features",
3575 &snapid, sizeof (snapid),
3576 &features_buf, sizeof (features_buf));
3577 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3580 if (ret < sizeof (features_buf))
3583 incompat = le64_to_cpu(features_buf.incompat);
3584 if (incompat & ~RBD_FEATURES_SUPPORTED)
3587 *snap_features = le64_to_cpu(features_buf.features);
3589 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3590 (unsigned long long)snap_id,
3591 (unsigned long long)*snap_features,
3592 (unsigned long long)le64_to_cpu(features_buf.incompat));
3597 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3599 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3600 &rbd_dev->header.features);
3603 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3605 struct rbd_spec *parent_spec;
3607 void *reply_buf = NULL;
3616 parent_spec = rbd_spec_alloc();
3620 size = sizeof (__le64) + /* pool_id */
3621 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3622 sizeof (__le64) + /* snap_id */
3623 sizeof (__le64); /* overlap */
3624 reply_buf = kmalloc(size, GFP_KERNEL);
3630 snapid = cpu_to_le64(CEPH_NOSNAP);
3631 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3632 "rbd", "get_parent",
3633 &snapid, sizeof (snapid),
3635 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3640 end = reply_buf + ret;
3642 ceph_decode_64_safe(&p, end, pool_id, out_err);
3643 if (pool_id == CEPH_NOPOOL)
3644 goto out; /* No parent? No problem. */
3646 /* The ceph file layout needs to fit pool id in 32 bits */
3649 if (pool_id > (u64)U32_MAX) {
3650 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3651 (unsigned long long)pool_id, U32_MAX);
3654 parent_spec->pool_id = pool_id;
3656 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3657 if (IS_ERR(image_id)) {
3658 ret = PTR_ERR(image_id);
3661 parent_spec->image_id = image_id;
3662 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3663 ceph_decode_64_safe(&p, end, overlap, out_err);
3666 rbd_spec_put(rbd_dev->parent_spec);
3667 rbd_dev->parent_spec = parent_spec;
3668 parent_spec = NULL; /* rbd_dev now owns this */
3669 rbd_dev->parent_overlap = overlap;
3671 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3677 rbd_spec_put(parent_spec);
3682 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3686 __le64 stripe_count;
3687 } __attribute__ ((packed)) striping_info_buf = { 0 };
3688 size_t size = sizeof (striping_info_buf);
3695 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3696 "rbd", "get_stripe_unit_count", NULL, 0,
3697 (char *)&striping_info_buf, size);
3698 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3705 * We don't actually support the "fancy striping" feature
3706 * (STRIPINGV2) yet, but if the striping sizes are the
3707 * defaults the behavior is the same as before. So find
3708 * out, and only fail if the image has non-default values.
3711 obj_size = (u64)1 << rbd_dev->header.obj_order;
3712 p = &striping_info_buf;
3713 stripe_unit = ceph_decode_64(&p);
3714 if (stripe_unit != obj_size) {
3715 rbd_warn(rbd_dev, "unsupported stripe unit "
3716 "(got %llu want %llu)",
3717 stripe_unit, obj_size);
3720 stripe_count = ceph_decode_64(&p);
3721 if (stripe_count != 1) {
3722 rbd_warn(rbd_dev, "unsupported stripe count "
3723 "(got %llu want 1)", stripe_count);
3726 rbd_dev->header.stripe_unit = stripe_unit;
3727 rbd_dev->header.stripe_count = stripe_count;
3732 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3734 size_t image_id_size;
3739 void *reply_buf = NULL;
3741 char *image_name = NULL;
3744 rbd_assert(!rbd_dev->spec->image_name);
3746 len = strlen(rbd_dev->spec->image_id);
3747 image_id_size = sizeof (__le32) + len;
3748 image_id = kmalloc(image_id_size, GFP_KERNEL);
3753 end = image_id + image_id_size;
3754 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3756 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3757 reply_buf = kmalloc(size, GFP_KERNEL);
3761 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3762 "rbd", "dir_get_name",
3763 image_id, image_id_size,
3768 end = reply_buf + ret;
3770 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3771 if (IS_ERR(image_name))
3774 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3782 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3784 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3785 const char *snap_name;
3788 /* Skip over names until we find the one we are looking for */
3790 snap_name = rbd_dev->header.snap_names;
3791 while (which < snapc->num_snaps) {
3792 if (!strcmp(name, snap_name))
3793 return snapc->snaps[which];
3794 snap_name += strlen(snap_name) + 1;
3800 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3802 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3807 for (which = 0; !found && which < snapc->num_snaps; which++) {
3808 const char *snap_name;
3810 snap_id = snapc->snaps[which];
3811 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3812 if (IS_ERR(snap_name))
3814 found = !strcmp(name, snap_name);
3817 return found ? snap_id : CEPH_NOSNAP;
3821 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3822 * no snapshot by that name is found, or if an error occurs.
3824 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3826 if (rbd_dev->image_format == 1)
3827 return rbd_v1_snap_id_by_name(rbd_dev, name);
3829 return rbd_v2_snap_id_by_name(rbd_dev, name);
3833 * When an rbd image has a parent image, it is identified by the
3834 * pool, image, and snapshot ids (not names). This function fills
3835 * in the names for those ids. (It's OK if we can't figure out the
3836 * name for an image id, but the pool and snapshot ids should always
3837 * exist and have names.) All names in an rbd spec are dynamically
3840 * When an image being mapped (not a parent) is probed, we have the
3841 * pool name and pool id, image name and image id, and the snapshot
3842 * name. The only thing we're missing is the snapshot id.
3844 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3846 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3847 struct rbd_spec *spec = rbd_dev->spec;
3848 const char *pool_name;
3849 const char *image_name;
3850 const char *snap_name;
3854 * An image being mapped will have the pool name (etc.), but
3855 * we need to look up the snapshot id.
3857 if (spec->pool_name) {
3858 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3861 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3862 if (snap_id == CEPH_NOSNAP)
3864 spec->snap_id = snap_id;
3866 spec->snap_id = CEPH_NOSNAP;
3872 /* Get the pool name; we have to make our own copy of this */
3874 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3876 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3879 pool_name = kstrdup(pool_name, GFP_KERNEL);
3883 /* Fetch the image name; tolerate failure here */
3885 image_name = rbd_dev_image_name(rbd_dev);
3887 rbd_warn(rbd_dev, "unable to get image name");
3889 /* Look up the snapshot name, and make a copy */
3891 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3897 spec->pool_name = pool_name;
3898 spec->image_name = image_name;
3899 spec->snap_name = snap_name;
3909 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3918 struct ceph_snap_context *snapc;
3922 * We'll need room for the seq value (maximum snapshot id),
3923 * snapshot count, and array of that many snapshot ids.
3924 * For now we have a fixed upper limit on the number we're
3925 * prepared to receive.
3927 size = sizeof (__le64) + sizeof (__le32) +
3928 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3929 reply_buf = kzalloc(size, GFP_KERNEL);
3933 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3934 "rbd", "get_snapcontext", NULL, 0,
3936 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3941 end = reply_buf + ret;
3943 ceph_decode_64_safe(&p, end, seq, out);
3944 ceph_decode_32_safe(&p, end, snap_count, out);
3947 * Make sure the reported number of snapshot ids wouldn't go
3948 * beyond the end of our buffer. But before checking that,
3949 * make sure the computed size of the snapshot context we
3950 * allocate is representable in a size_t.
3952 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3957 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3961 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3967 for (i = 0; i < snap_count; i++)
3968 snapc->snaps[i] = ceph_decode_64(&p);
3970 ceph_put_snap_context(rbd_dev->header.snapc);
3971 rbd_dev->header.snapc = snapc;
3973 dout(" snap context seq = %llu, snap_count = %u\n",
3974 (unsigned long long)seq, (unsigned int)snap_count);
3981 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3992 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3993 reply_buf = kmalloc(size, GFP_KERNEL);
3995 return ERR_PTR(-ENOMEM);
3997 snapid = cpu_to_le64(snap_id);
3998 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3999 "rbd", "get_snapshot_name",
4000 &snapid, sizeof (snapid),
4002 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4004 snap_name = ERR_PTR(ret);
4009 end = reply_buf + ret;
4010 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4011 if (IS_ERR(snap_name))
4014 dout(" snap_id 0x%016llx snap_name = %s\n",
4015 (unsigned long long)snap_id, snap_name);
4022 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4024 bool first_time = rbd_dev->header.object_prefix == NULL;
4027 down_write(&rbd_dev->header_rwsem);
4030 ret = rbd_dev_v2_header_onetime(rbd_dev);
4036 * If the image supports layering, get the parent info. We
4037 * need to probe the first time regardless. Thereafter we
4038 * only need to if there's a parent, to see if it has
4039 * disappeared due to the mapped image getting flattened.
4041 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4042 (first_time || rbd_dev->parent_spec)) {
4045 ret = rbd_dev_v2_parent_info(rbd_dev);
4050 * Print a warning if this is the initial probe and
4051 * the image has a parent. Don't print it if the
4052 * image now being probed is itself a parent. We
4053 * can tell at this point because we won't know its
4054 * pool name yet (just its pool id).
4056 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4057 if (first_time && warn)
4058 rbd_warn(rbd_dev, "WARNING: kernel layering "
4059 "is EXPERIMENTAL!");
4062 ret = rbd_dev_v2_image_size(rbd_dev);
4066 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4067 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4068 rbd_dev->mapping.size = rbd_dev->header.image_size;
4070 ret = rbd_dev_v2_snap_context(rbd_dev);
4071 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4073 up_write(&rbd_dev->header_rwsem);
4078 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4083 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4085 dev = &rbd_dev->dev;
4086 dev->bus = &rbd_bus_type;
4087 dev->type = &rbd_device_type;
4088 dev->parent = &rbd_root_dev;
4089 dev->release = rbd_dev_device_release;
4090 dev_set_name(dev, "%d", rbd_dev->dev_id);
4091 ret = device_register(dev);
4093 mutex_unlock(&ctl_mutex);
4098 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4100 device_unregister(&rbd_dev->dev);
4103 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4106 * Get a unique rbd identifier for the given new rbd_dev, and add
4107 * the rbd_dev to the global list. The minimum rbd id is 1.
4109 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4111 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4113 spin_lock(&rbd_dev_list_lock);
4114 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4115 spin_unlock(&rbd_dev_list_lock);
4116 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4117 (unsigned long long) rbd_dev->dev_id);
4121 * Remove an rbd_dev from the global list, and record that its
4122 * identifier is no longer in use.
4124 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4126 struct list_head *tmp;
4127 int rbd_id = rbd_dev->dev_id;
4130 rbd_assert(rbd_id > 0);
4132 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4133 (unsigned long long) rbd_dev->dev_id);
4134 spin_lock(&rbd_dev_list_lock);
4135 list_del_init(&rbd_dev->node);
4138 * If the id being "put" is not the current maximum, there
4139 * is nothing special we need to do.
4141 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4142 spin_unlock(&rbd_dev_list_lock);
4147 * We need to update the current maximum id. Search the
4148 * list to find out what it is. We're more likely to find
4149 * the maximum at the end, so search the list backward.
4152 list_for_each_prev(tmp, &rbd_dev_list) {
4153 struct rbd_device *rbd_dev;
4155 rbd_dev = list_entry(tmp, struct rbd_device, node);
4156 if (rbd_dev->dev_id > max_id)
4157 max_id = rbd_dev->dev_id;
4159 spin_unlock(&rbd_dev_list_lock);
4162 * The max id could have been updated by rbd_dev_id_get(), in
4163 * which case it now accurately reflects the new maximum.
4164 * Be careful not to overwrite the maximum value in that
4167 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4168 dout(" max dev id has been reset\n");
4172 * Skips over white space at *buf, and updates *buf to point to the
4173 * first found non-space character (if any). Returns the length of
4174 * the token (string of non-white space characters) found. Note
4175 * that *buf must be terminated with '\0'.
4177 static inline size_t next_token(const char **buf)
4180 * These are the characters that produce nonzero for
4181 * isspace() in the "C" and "POSIX" locales.
4183 const char *spaces = " \f\n\r\t\v";
4185 *buf += strspn(*buf, spaces); /* Find start of token */
4187 return strcspn(*buf, spaces); /* Return token length */
4191 * Finds the next token in *buf, and if the provided token buffer is
4192 * big enough, copies the found token into it. The result, if
4193 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4194 * must be terminated with '\0' on entry.
4196 * Returns the length of the token found (not including the '\0').
4197 * Return value will be 0 if no token is found, and it will be >=
4198 * token_size if the token would not fit.
4200 * The *buf pointer will be updated to point beyond the end of the
4201 * found token. Note that this occurs even if the token buffer is
4202 * too small to hold it.
4204 static inline size_t copy_token(const char **buf,
4210 len = next_token(buf);
4211 if (len < token_size) {
4212 memcpy(token, *buf, len);
4213 *(token + len) = '\0';
4221 * Finds the next token in *buf, dynamically allocates a buffer big
4222 * enough to hold a copy of it, and copies the token into the new
4223 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4224 * that a duplicate buffer is created even for a zero-length token.
4226 * Returns a pointer to the newly-allocated duplicate, or a null
4227 * pointer if memory for the duplicate was not available. If
4228 * the lenp argument is a non-null pointer, the length of the token
4229 * (not including the '\0') is returned in *lenp.
4231 * If successful, the *buf pointer will be updated to point beyond
4232 * the end of the found token.
4234 * Note: uses GFP_KERNEL for allocation.
4236 static inline char *dup_token(const char **buf, size_t *lenp)
4241 len = next_token(buf);
4242 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4245 *(dup + len) = '\0';
4255 * Parse the options provided for an "rbd add" (i.e., rbd image
4256 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4257 * and the data written is passed here via a NUL-terminated buffer.
4258 * Returns 0 if successful or an error code otherwise.
4260 * The information extracted from these options is recorded in
4261 * the other parameters which return dynamically-allocated
4264 * The address of a pointer that will refer to a ceph options
4265 * structure. Caller must release the returned pointer using
4266 * ceph_destroy_options() when it is no longer needed.
4268 * Address of an rbd options pointer. Fully initialized by
4269 * this function; caller must release with kfree().
4271 * Address of an rbd image specification pointer. Fully
4272 * initialized by this function based on parsed options.
4273 * Caller must release with rbd_spec_put().
4275 * The options passed take this form:
4276 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4279 * A comma-separated list of one or more monitor addresses.
4280 * A monitor address is an ip address, optionally followed
4281 * by a port number (separated by a colon).
4282 * I.e.: ip1[:port1][,ip2[:port2]...]
4284 * A comma-separated list of ceph and/or rbd options.
4286 * The name of the rados pool containing the rbd image.
4288 * The name of the image in that pool to map.
4290 * An optional snapshot id. If provided, the mapping will
4291 * present data from the image at the time that snapshot was
4292 * created. The image head is used if no snapshot id is
4293 * provided. Snapshot mappings are always read-only.
4295 static int rbd_add_parse_args(const char *buf,
4296 struct ceph_options **ceph_opts,
4297 struct rbd_options **opts,
4298 struct rbd_spec **rbd_spec)
4302 const char *mon_addrs;
4304 size_t mon_addrs_size;
4305 struct rbd_spec *spec = NULL;
4306 struct rbd_options *rbd_opts = NULL;
4307 struct ceph_options *copts;
4310 /* The first four tokens are required */
4312 len = next_token(&buf);
4314 rbd_warn(NULL, "no monitor address(es) provided");
4318 mon_addrs_size = len + 1;
4322 options = dup_token(&buf, NULL);
4326 rbd_warn(NULL, "no options provided");
4330 spec = rbd_spec_alloc();
4334 spec->pool_name = dup_token(&buf, NULL);
4335 if (!spec->pool_name)
4337 if (!*spec->pool_name) {
4338 rbd_warn(NULL, "no pool name provided");
4342 spec->image_name = dup_token(&buf, NULL);
4343 if (!spec->image_name)
4345 if (!*spec->image_name) {
4346 rbd_warn(NULL, "no image name provided");
4351 * Snapshot name is optional; default is to use "-"
4352 * (indicating the head/no snapshot).
4354 len = next_token(&buf);
4356 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4357 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4358 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4359 ret = -ENAMETOOLONG;
4362 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4365 *(snap_name + len) = '\0';
4366 spec->snap_name = snap_name;
4368 /* Initialize all rbd options to the defaults */
4370 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4374 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4376 copts = ceph_parse_options(options, mon_addrs,
4377 mon_addrs + mon_addrs_size - 1,
4378 parse_rbd_opts_token, rbd_opts);
4379 if (IS_ERR(copts)) {
4380 ret = PTR_ERR(copts);
4401 * An rbd format 2 image has a unique identifier, distinct from the
4402 * name given to it by the user. Internally, that identifier is
4403 * what's used to specify the names of objects related to the image.
4405 * A special "rbd id" object is used to map an rbd image name to its
4406 * id. If that object doesn't exist, then there is no v2 rbd image
4407 * with the supplied name.
4409 * This function will record the given rbd_dev's image_id field if
4410 * it can be determined, and in that case will return 0. If any
4411 * errors occur a negative errno will be returned and the rbd_dev's
4412 * image_id field will be unchanged (and should be NULL).
4414 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4423 * When probing a parent image, the image id is already
4424 * known (and the image name likely is not). There's no
4425 * need to fetch the image id again in this case. We
4426 * do still need to set the image format though.
4428 if (rbd_dev->spec->image_id) {
4429 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4435 * First, see if the format 2 image id file exists, and if
4436 * so, get the image's persistent id from it.
4438 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4439 object_name = kmalloc(size, GFP_NOIO);
4442 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4443 dout("rbd id object name is %s\n", object_name);
4445 /* Response will be an encoded string, which includes a length */
4447 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4448 response = kzalloc(size, GFP_NOIO);
4454 /* If it doesn't exist we'll assume it's a format 1 image */
4456 ret = rbd_obj_method_sync(rbd_dev, object_name,
4457 "rbd", "get_id", NULL, 0,
4458 response, RBD_IMAGE_ID_LEN_MAX);
4459 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4460 if (ret == -ENOENT) {
4461 image_id = kstrdup("", GFP_KERNEL);
4462 ret = image_id ? 0 : -ENOMEM;
4464 rbd_dev->image_format = 1;
4465 } else if (ret > sizeof (__le32)) {
4468 image_id = ceph_extract_encoded_string(&p, p + ret,
4470 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4472 rbd_dev->image_format = 2;
4478 rbd_dev->spec->image_id = image_id;
4479 dout("image_id is %s\n", image_id);
4488 /* Undo whatever state changes are made by v1 or v2 image probe */
4490 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4492 struct rbd_image_header *header;
4494 rbd_dev_remove_parent(rbd_dev);
4495 rbd_spec_put(rbd_dev->parent_spec);
4496 rbd_dev->parent_spec = NULL;
4497 rbd_dev->parent_overlap = 0;
4499 /* Free dynamic fields from the header, then zero it out */
4501 header = &rbd_dev->header;
4502 ceph_put_snap_context(header->snapc);
4503 kfree(header->snap_sizes);
4504 kfree(header->snap_names);
4505 kfree(header->object_prefix);
4506 memset(header, 0, sizeof (*header));
4509 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4513 ret = rbd_dev_v2_object_prefix(rbd_dev);
4518 * Get the and check features for the image. Currently the
4519 * features are assumed to never change.
4521 ret = rbd_dev_v2_features(rbd_dev);
4525 /* If the image supports fancy striping, get its parameters */
4527 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4528 ret = rbd_dev_v2_striping_info(rbd_dev);
4532 /* No support for crypto and compression type format 2 images */
4536 rbd_dev->header.features = 0;
4537 kfree(rbd_dev->header.object_prefix);
4538 rbd_dev->header.object_prefix = NULL;
4543 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4545 struct rbd_device *parent = NULL;
4546 struct rbd_spec *parent_spec;
4547 struct rbd_client *rbdc;
4550 if (!rbd_dev->parent_spec)
4553 * We need to pass a reference to the client and the parent
4554 * spec when creating the parent rbd_dev. Images related by
4555 * parent/child relationships always share both.
4557 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4558 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4561 parent = rbd_dev_create(rbdc, parent_spec);
4565 ret = rbd_dev_image_probe(parent, false);
4568 rbd_dev->parent = parent;
4573 rbd_spec_put(rbd_dev->parent_spec);
4574 kfree(rbd_dev->header_name);
4575 rbd_dev_destroy(parent);
4577 rbd_put_client(rbdc);
4578 rbd_spec_put(parent_spec);
4584 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4588 /* generate unique id: find highest unique id, add one */
4589 rbd_dev_id_get(rbd_dev);
4591 /* Fill in the device name, now that we have its id. */
4592 BUILD_BUG_ON(DEV_NAME_LEN
4593 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4594 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4596 /* Get our block major device number. */
4598 ret = register_blkdev(0, rbd_dev->name);
4601 rbd_dev->major = ret;
4603 /* Set up the blkdev mapping. */
4605 ret = rbd_init_disk(rbd_dev);
4607 goto err_out_blkdev;
4609 ret = rbd_dev_mapping_set(rbd_dev);
4612 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4614 ret = rbd_bus_add_dev(rbd_dev);
4616 goto err_out_mapping;
4618 /* Everything's ready. Announce the disk to the world. */
4620 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4621 add_disk(rbd_dev->disk);
4623 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4624 (unsigned long long) rbd_dev->mapping.size);
4629 rbd_dev_mapping_clear(rbd_dev);
4631 rbd_free_disk(rbd_dev);
4633 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4635 rbd_dev_id_put(rbd_dev);
4636 rbd_dev_mapping_clear(rbd_dev);
4641 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4643 struct rbd_spec *spec = rbd_dev->spec;
4646 /* Record the header object name for this rbd image. */
4648 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4650 if (rbd_dev->image_format == 1)
4651 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4653 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4655 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4656 if (!rbd_dev->header_name)
4659 if (rbd_dev->image_format == 1)
4660 sprintf(rbd_dev->header_name, "%s%s",
4661 spec->image_name, RBD_SUFFIX);
4663 sprintf(rbd_dev->header_name, "%s%s",
4664 RBD_HEADER_PREFIX, spec->image_id);
4668 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4670 rbd_dev_unprobe(rbd_dev);
4671 kfree(rbd_dev->header_name);
4672 rbd_dev->header_name = NULL;
4673 rbd_dev->image_format = 0;
4674 kfree(rbd_dev->spec->image_id);
4675 rbd_dev->spec->image_id = NULL;
4677 rbd_dev_destroy(rbd_dev);
4681 * Probe for the existence of the header object for the given rbd
4682 * device. If this image is the one being mapped (i.e., not a
4683 * parent), initiate a watch on its header object before using that
4684 * object to get detailed information about the rbd image.
4686 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4692 * Get the id from the image id object. If it's not a
4693 * format 2 image, we'll get ENOENT back, and we'll assume
4694 * it's a format 1 image.
4696 ret = rbd_dev_image_id(rbd_dev);
4699 rbd_assert(rbd_dev->spec->image_id);
4700 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4702 ret = rbd_dev_header_name(rbd_dev);
4704 goto err_out_format;
4707 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4709 goto out_header_name;
4712 if (rbd_dev->image_format == 1)
4713 ret = rbd_dev_v1_header_info(rbd_dev);
4715 ret = rbd_dev_v2_header_info(rbd_dev);
4719 ret = rbd_dev_spec_update(rbd_dev);
4723 ret = rbd_dev_probe_parent(rbd_dev);
4727 dout("discovered format %u image, header name is %s\n",
4728 rbd_dev->image_format, rbd_dev->header_name);
4732 rbd_dev_unprobe(rbd_dev);
4735 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4737 rbd_warn(rbd_dev, "unable to tear down "
4738 "watch request (%d)\n", tmp);
4741 kfree(rbd_dev->header_name);
4742 rbd_dev->header_name = NULL;
4744 rbd_dev->image_format = 0;
4745 kfree(rbd_dev->spec->image_id);
4746 rbd_dev->spec->image_id = NULL;
4748 dout("probe failed, returning %d\n", ret);
4753 static ssize_t rbd_add(struct bus_type *bus,
4757 struct rbd_device *rbd_dev = NULL;
4758 struct ceph_options *ceph_opts = NULL;
4759 struct rbd_options *rbd_opts = NULL;
4760 struct rbd_spec *spec = NULL;
4761 struct rbd_client *rbdc;
4762 struct ceph_osd_client *osdc;
4766 if (!try_module_get(THIS_MODULE))
4769 /* parse add command */
4770 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4772 goto err_out_module;
4773 read_only = rbd_opts->read_only;
4775 rbd_opts = NULL; /* done with this */
4777 rbdc = rbd_get_client(ceph_opts);
4782 ceph_opts = NULL; /* rbd_dev client now owns this */
4785 osdc = &rbdc->client->osdc;
4786 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4788 goto err_out_client;
4789 spec->pool_id = (u64)rc;
4791 /* The ceph file layout needs to fit pool id in 32 bits */
4793 if (spec->pool_id > (u64)U32_MAX) {
4794 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4795 (unsigned long long)spec->pool_id, U32_MAX);
4797 goto err_out_client;
4800 rbd_dev = rbd_dev_create(rbdc, spec);
4802 goto err_out_client;
4803 rbdc = NULL; /* rbd_dev now owns this */
4804 spec = NULL; /* rbd_dev now owns this */
4806 rc = rbd_dev_image_probe(rbd_dev, true);
4808 goto err_out_rbd_dev;
4810 /* If we are mapping a snapshot it must be marked read-only */
4812 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4814 rbd_dev->mapping.read_only = read_only;
4816 rc = rbd_dev_device_setup(rbd_dev);
4820 rbd_dev_image_release(rbd_dev);
4822 rbd_dev_destroy(rbd_dev);
4824 rbd_put_client(rbdc);
4827 ceph_destroy_options(ceph_opts);
4831 module_put(THIS_MODULE);
4833 dout("Error adding device %s\n", buf);
4838 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4840 struct list_head *tmp;
4841 struct rbd_device *rbd_dev;
4843 spin_lock(&rbd_dev_list_lock);
4844 list_for_each(tmp, &rbd_dev_list) {
4845 rbd_dev = list_entry(tmp, struct rbd_device, node);
4846 if (rbd_dev->dev_id == dev_id) {
4847 spin_unlock(&rbd_dev_list_lock);
4851 spin_unlock(&rbd_dev_list_lock);
4855 static void rbd_dev_device_release(struct device *dev)
4857 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4859 rbd_free_disk(rbd_dev);
4860 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4861 rbd_dev_mapping_clear(rbd_dev);
4862 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4864 rbd_dev_id_put(rbd_dev);
4865 rbd_dev_mapping_clear(rbd_dev);
4868 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4870 while (rbd_dev->parent) {
4871 struct rbd_device *first = rbd_dev;
4872 struct rbd_device *second = first->parent;
4873 struct rbd_device *third;
4876 * Follow to the parent with no grandparent and
4879 while (second && (third = second->parent)) {
4884 rbd_dev_image_release(second);
4885 first->parent = NULL;
4886 first->parent_overlap = 0;
4888 rbd_assert(first->parent_spec);
4889 rbd_spec_put(first->parent_spec);
4890 first->parent_spec = NULL;
4894 static ssize_t rbd_remove(struct bus_type *bus,
4898 struct rbd_device *rbd_dev = NULL;
4903 ret = strict_strtoul(buf, 10, &ul);
4907 /* convert to int; abort if we lost anything in the conversion */
4908 target_id = (int) ul;
4909 if (target_id != ul)
4912 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4914 rbd_dev = __rbd_get_dev(target_id);
4920 spin_lock_irq(&rbd_dev->lock);
4921 if (rbd_dev->open_count)
4924 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4925 spin_unlock_irq(&rbd_dev->lock);
4928 rbd_bus_del_dev(rbd_dev);
4929 ret = rbd_dev_header_watch_sync(rbd_dev, false);
4931 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4932 rbd_dev_image_release(rbd_dev);
4933 module_put(THIS_MODULE);
4936 mutex_unlock(&ctl_mutex);
4942 * create control files in sysfs
4945 static int rbd_sysfs_init(void)
4949 ret = device_register(&rbd_root_dev);
4953 ret = bus_register(&rbd_bus_type);
4955 device_unregister(&rbd_root_dev);
4960 static void rbd_sysfs_cleanup(void)
4962 bus_unregister(&rbd_bus_type);
4963 device_unregister(&rbd_root_dev);
4966 static int rbd_slab_init(void)
4968 rbd_assert(!rbd_img_request_cache);
4969 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4970 sizeof (struct rbd_img_request),
4971 __alignof__(struct rbd_img_request),
4973 if (!rbd_img_request_cache)
4976 rbd_assert(!rbd_obj_request_cache);
4977 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4978 sizeof (struct rbd_obj_request),
4979 __alignof__(struct rbd_obj_request),
4981 if (!rbd_obj_request_cache)
4984 rbd_assert(!rbd_segment_name_cache);
4985 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4986 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4987 if (rbd_segment_name_cache)
4990 if (rbd_obj_request_cache) {
4991 kmem_cache_destroy(rbd_obj_request_cache);
4992 rbd_obj_request_cache = NULL;
4995 kmem_cache_destroy(rbd_img_request_cache);
4996 rbd_img_request_cache = NULL;
5001 static void rbd_slab_exit(void)
5003 rbd_assert(rbd_segment_name_cache);
5004 kmem_cache_destroy(rbd_segment_name_cache);
5005 rbd_segment_name_cache = NULL;
5007 rbd_assert(rbd_obj_request_cache);
5008 kmem_cache_destroy(rbd_obj_request_cache);
5009 rbd_obj_request_cache = NULL;
5011 rbd_assert(rbd_img_request_cache);
5012 kmem_cache_destroy(rbd_img_request_cache);
5013 rbd_img_request_cache = NULL;
5016 static int __init rbd_init(void)
5020 if (!libceph_compatible(NULL)) {
5021 rbd_warn(NULL, "libceph incompatibility (quitting)");
5025 rc = rbd_slab_init();
5028 rc = rbd_sysfs_init();
5032 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5037 static void __exit rbd_exit(void)
5039 rbd_sysfs_cleanup();
5043 module_init(rbd_init);
5044 module_exit(rbd_exit);
5046 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5047 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5048 MODULE_DESCRIPTION("rados block device");
5050 /* following authorship retained from original osdblk.c */
5051 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5053 MODULE_LICENSE("GPL");