2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
71 #define RBD_SNAP_HEAD_NAME "-"
73 #define RBD_IMAGE_ID_LEN_MAX 64
74 #define RBD_OBJ_PREFIX_LEN_MAX 64
78 #define RBD_FEATURE_LAYERING 1
80 /* Features supported by this (client software) implementation. */
82 #define RBD_FEATURES_ALL (0)
85 * An RBD device name will be "rbd#", where the "rbd" comes from
86 * RBD_DRV_NAME above, and # is a unique integer identifier.
87 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88 * enough to hold all possible device names.
90 #define DEV_NAME_LEN 32
91 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
93 #define RBD_READ_ONLY_DEFAULT false
96 * block device image metadata (in-memory version)
98 struct rbd_image_header {
99 /* These four fields never change for a given rbd image */
106 /* The remaining fields need to be updated occasionally */
108 struct ceph_snap_context *snapc;
116 * An rbd image specification.
118 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
128 size_t image_name_len;
141 * an instance of the client. multiple devices may share an rbd client.
144 struct ceph_client *client;
146 struct list_head node;
150 * a request completion status
152 struct rbd_req_status {
159 * a collection of requests
161 struct rbd_req_coll {
165 struct rbd_req_status status[0];
169 * a single io request
172 struct request *rq; /* blk layer request */
173 struct bio *bio; /* cloned bio */
174 struct page **pages; /* list of used pages */
177 struct rbd_req_coll *coll;
184 struct list_head node;
199 int dev_id; /* blkdev unique id */
201 int major; /* blkdev assigned major */
202 struct gendisk *disk; /* blkdev's gendisk and rq */
204 u32 image_format; /* Either 1 or 2 */
205 struct rbd_client *rbd_client;
207 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
209 spinlock_t lock; /* queue lock */
211 struct rbd_image_header header;
213 struct rbd_spec *spec;
217 struct ceph_osd_event *watch_event;
218 struct ceph_osd_request *watch_request;
220 /* protects updating the header */
221 struct rw_semaphore header_rwsem;
223 struct rbd_mapping mapping;
225 struct list_head node;
227 /* list of snapshots */
228 struct list_head snaps;
234 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
236 static LIST_HEAD(rbd_dev_list); /* devices */
237 static DEFINE_SPINLOCK(rbd_dev_list_lock);
239 static LIST_HEAD(rbd_client_list); /* clients */
240 static DEFINE_SPINLOCK(rbd_client_list_lock);
242 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
243 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
245 static void rbd_dev_release(struct device *dev);
246 static void rbd_remove_snap_dev(struct rbd_snap *snap);
248 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
250 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
253 static struct bus_attribute rbd_bus_attrs[] = {
254 __ATTR(add, S_IWUSR, NULL, rbd_add),
255 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
259 static struct bus_type rbd_bus_type = {
261 .bus_attrs = rbd_bus_attrs,
264 static void rbd_root_dev_release(struct device *dev)
268 static struct device rbd_root_dev = {
270 .release = rbd_root_dev_release,
274 #define rbd_assert(expr) \
275 if (unlikely(!(expr))) { \
276 printk(KERN_ERR "\nAssertion failure in %s() " \
278 "\trbd_assert(%s);\n\n", \
279 __func__, __LINE__, #expr); \
282 #else /* !RBD_DEBUG */
283 # define rbd_assert(expr) ((void) 0)
284 #endif /* !RBD_DEBUG */
286 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
288 return get_device(&rbd_dev->dev);
291 static void rbd_put_dev(struct rbd_device *rbd_dev)
293 put_device(&rbd_dev->dev);
296 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
297 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
299 static int rbd_open(struct block_device *bdev, fmode_t mode)
301 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
303 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
306 rbd_get_dev(rbd_dev);
307 set_device_ro(bdev, rbd_dev->mapping.read_only);
312 static int rbd_release(struct gendisk *disk, fmode_t mode)
314 struct rbd_device *rbd_dev = disk->private_data;
316 rbd_put_dev(rbd_dev);
321 static const struct block_device_operations rbd_bd_ops = {
322 .owner = THIS_MODULE,
324 .release = rbd_release,
328 * Initialize an rbd client instance.
331 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
333 struct rbd_client *rbdc;
336 dout("rbd_client_create\n");
337 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
341 kref_init(&rbdc->kref);
342 INIT_LIST_HEAD(&rbdc->node);
344 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
346 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
347 if (IS_ERR(rbdc->client))
349 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
351 ret = ceph_open_session(rbdc->client);
355 spin_lock(&rbd_client_list_lock);
356 list_add_tail(&rbdc->node, &rbd_client_list);
357 spin_unlock(&rbd_client_list_lock);
359 mutex_unlock(&ctl_mutex);
361 dout("rbd_client_create created %p\n", rbdc);
365 ceph_destroy_client(rbdc->client);
367 mutex_unlock(&ctl_mutex);
371 ceph_destroy_options(ceph_opts);
376 * Find a ceph client with specific addr and configuration. If
377 * found, bump its reference count.
379 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
381 struct rbd_client *client_node;
384 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
387 spin_lock(&rbd_client_list_lock);
388 list_for_each_entry(client_node, &rbd_client_list, node) {
389 if (!ceph_compare_options(ceph_opts, client_node->client)) {
390 kref_get(&client_node->kref);
395 spin_unlock(&rbd_client_list_lock);
397 return found ? client_node : NULL;
407 /* string args above */
410 /* Boolean args above */
414 static match_table_t rbd_opts_tokens = {
416 /* string args above */
417 {Opt_read_only, "read_only"},
418 {Opt_read_only, "ro"}, /* Alternate spelling */
419 {Opt_read_write, "read_write"},
420 {Opt_read_write, "rw"}, /* Alternate spelling */
421 /* Boolean args above */
425 static int parse_rbd_opts_token(char *c, void *private)
427 struct rbd_options *rbd_opts = private;
428 substring_t argstr[MAX_OPT_ARGS];
429 int token, intval, ret;
431 token = match_token(c, rbd_opts_tokens, argstr);
435 if (token < Opt_last_int) {
436 ret = match_int(&argstr[0], &intval);
438 pr_err("bad mount option arg (not int) "
442 dout("got int token %d val %d\n", token, intval);
443 } else if (token > Opt_last_int && token < Opt_last_string) {
444 dout("got string token %d val %s\n", token,
446 } else if (token > Opt_last_string && token < Opt_last_bool) {
447 dout("got Boolean token %d\n", token);
449 dout("got token %d\n", token);
454 rbd_opts->read_only = true;
457 rbd_opts->read_only = false;
467 * Get a ceph client with specific addr and configuration, if one does
468 * not exist create it.
470 static int rbd_get_client(struct rbd_device *rbd_dev,
471 struct ceph_options *ceph_opts)
473 struct rbd_client *rbdc;
475 rbdc = rbd_client_find(ceph_opts);
477 /* using an existing client */
478 ceph_destroy_options(ceph_opts);
480 rbdc = rbd_client_create(ceph_opts);
482 return PTR_ERR(rbdc);
484 rbd_dev->rbd_client = rbdc;
490 * Destroy ceph client
492 * Caller must hold rbd_client_list_lock.
494 static void rbd_client_release(struct kref *kref)
496 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
498 dout("rbd_release_client %p\n", rbdc);
499 spin_lock(&rbd_client_list_lock);
500 list_del(&rbdc->node);
501 spin_unlock(&rbd_client_list_lock);
503 ceph_destroy_client(rbdc->client);
508 * Drop reference to ceph client node. If it's not referenced anymore, release
511 static void rbd_put_client(struct rbd_device *rbd_dev)
513 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
514 rbd_dev->rbd_client = NULL;
518 * Destroy requests collection
520 static void rbd_coll_release(struct kref *kref)
522 struct rbd_req_coll *coll =
523 container_of(kref, struct rbd_req_coll, kref);
525 dout("rbd_coll_release %p\n", coll);
529 static bool rbd_image_format_valid(u32 image_format)
531 return image_format == 1 || image_format == 2;
534 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
539 /* The header has to start with the magic rbd header text */
540 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
543 /* The bio layer requires at least sector-sized I/O */
545 if (ondisk->options.order < SECTOR_SHIFT)
548 /* If we use u64 in a few spots we may be able to loosen this */
550 if (ondisk->options.order > 8 * sizeof (int) - 1)
554 * The size of a snapshot header has to fit in a size_t, and
555 * that limits the number of snapshots.
557 snap_count = le32_to_cpu(ondisk->snap_count);
558 size = SIZE_MAX - sizeof (struct ceph_snap_context);
559 if (snap_count > size / sizeof (__le64))
563 * Not only that, but the size of the entire the snapshot
564 * header must also be representable in a size_t.
566 size -= snap_count * sizeof (__le64);
567 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
574 * Create a new header structure, translate header format from the on-disk
577 static int rbd_header_from_disk(struct rbd_image_header *header,
578 struct rbd_image_header_ondisk *ondisk)
585 memset(header, 0, sizeof (*header));
587 snap_count = le32_to_cpu(ondisk->snap_count);
589 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
590 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
591 if (!header->object_prefix)
593 memcpy(header->object_prefix, ondisk->object_prefix, len);
594 header->object_prefix[len] = '\0';
597 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
599 /* Save a copy of the snapshot names */
601 if (snap_names_len > (u64) SIZE_MAX)
603 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
604 if (!header->snap_names)
607 * Note that rbd_dev_v1_header_read() guarantees
608 * the ondisk buffer we're working with has
609 * snap_names_len bytes beyond the end of the
610 * snapshot id array, this memcpy() is safe.
612 memcpy(header->snap_names, &ondisk->snaps[snap_count],
615 /* Record each snapshot's size */
617 size = snap_count * sizeof (*header->snap_sizes);
618 header->snap_sizes = kmalloc(size, GFP_KERNEL);
619 if (!header->snap_sizes)
621 for (i = 0; i < snap_count; i++)
622 header->snap_sizes[i] =
623 le64_to_cpu(ondisk->snaps[i].image_size);
625 WARN_ON(ondisk->snap_names_len);
626 header->snap_names = NULL;
627 header->snap_sizes = NULL;
630 header->features = 0; /* No features support in v1 images */
631 header->obj_order = ondisk->options.order;
632 header->crypt_type = ondisk->options.crypt_type;
633 header->comp_type = ondisk->options.comp_type;
635 /* Allocate and fill in the snapshot context */
637 header->image_size = le64_to_cpu(ondisk->image_size);
638 size = sizeof (struct ceph_snap_context);
639 size += snap_count * sizeof (header->snapc->snaps[0]);
640 header->snapc = kzalloc(size, GFP_KERNEL);
644 atomic_set(&header->snapc->nref, 1);
645 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
646 header->snapc->num_snaps = snap_count;
647 for (i = 0; i < snap_count; i++)
648 header->snapc->snaps[i] =
649 le64_to_cpu(ondisk->snaps[i].id);
654 kfree(header->snap_sizes);
655 header->snap_sizes = NULL;
656 kfree(header->snap_names);
657 header->snap_names = NULL;
658 kfree(header->object_prefix);
659 header->object_prefix = NULL;
664 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
667 struct rbd_snap *snap;
669 list_for_each_entry(snap, &rbd_dev->snaps, node) {
670 if (!strcmp(snap_name, snap->name)) {
671 rbd_dev->spec->snap_id = snap->id;
672 rbd_dev->mapping.size = snap->size;
673 rbd_dev->mapping.features = snap->features;
682 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
686 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
687 sizeof (RBD_SNAP_HEAD_NAME))) {
688 rbd_dev->spec->snap_id = CEPH_NOSNAP;
689 rbd_dev->mapping.size = rbd_dev->header.image_size;
690 rbd_dev->mapping.features = rbd_dev->header.features;
693 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
696 rbd_dev->mapping.read_only = true;
698 rbd_dev->exists = true;
703 static void rbd_header_free(struct rbd_image_header *header)
705 kfree(header->object_prefix);
706 header->object_prefix = NULL;
707 kfree(header->snap_sizes);
708 header->snap_sizes = NULL;
709 kfree(header->snap_names);
710 header->snap_names = NULL;
711 ceph_put_snap_context(header->snapc);
712 header->snapc = NULL;
715 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
721 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
724 segment = offset >> rbd_dev->header.obj_order;
725 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
726 rbd_dev->header.object_prefix, segment);
727 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
728 pr_err("error formatting segment name for #%llu (%d)\n",
737 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
739 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
741 return offset & (segment_size - 1);
744 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
745 u64 offset, u64 length)
747 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
749 offset &= segment_size - 1;
751 rbd_assert(length <= U64_MAX - offset);
752 if (offset + length > segment_size)
753 length = segment_size - offset;
758 static int rbd_get_num_segments(struct rbd_image_header *header,
766 if (len - 1 > U64_MAX - ofs)
769 start_seg = ofs >> header->obj_order;
770 end_seg = (ofs + len - 1) >> header->obj_order;
772 return end_seg - start_seg + 1;
776 * returns the size of an object in the image
778 static u64 rbd_obj_bytes(struct rbd_image_header *header)
780 return 1 << header->obj_order;
787 static void bio_chain_put(struct bio *chain)
793 chain = chain->bi_next;
799 * zeros a bio chain, starting at specific offset
801 static void zero_bio_chain(struct bio *chain, int start_ofs)
810 bio_for_each_segment(bv, chain, i) {
811 if (pos + bv->bv_len > start_ofs) {
812 int remainder = max(start_ofs - pos, 0);
813 buf = bvec_kmap_irq(bv, &flags);
814 memset(buf + remainder, 0,
815 bv->bv_len - remainder);
816 bvec_kunmap_irq(buf, &flags);
821 chain = chain->bi_next;
826 * Clone a portion of a bio, starting at the given byte offset
827 * and continuing for the number of bytes indicated.
829 static struct bio *bio_clone_range(struct bio *bio_src,
838 unsigned short end_idx;
842 /* Handle the easy case for the caller */
844 if (!offset && len == bio_src->bi_size)
845 return bio_clone(bio_src, gfpmask);
847 if (WARN_ON_ONCE(!len))
849 if (WARN_ON_ONCE(len > bio_src->bi_size))
851 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
854 /* Find first affected segment... */
857 __bio_for_each_segment(bv, bio_src, idx, 0) {
858 if (resid < bv->bv_len)
864 /* ...and the last affected segment */
867 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
868 if (resid <= bv->bv_len)
872 vcnt = end_idx - idx + 1;
874 /* Build the clone */
876 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
878 return NULL; /* ENOMEM */
880 bio->bi_bdev = bio_src->bi_bdev;
881 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
882 bio->bi_rw = bio_src->bi_rw;
883 bio->bi_flags |= 1 << BIO_CLONED;
886 * Copy over our part of the bio_vec, then update the first
887 * and last (or only) entries.
889 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
890 vcnt * sizeof (struct bio_vec));
891 bio->bi_io_vec[0].bv_offset += voff;
893 bio->bi_io_vec[0].bv_len -= voff;
894 bio->bi_io_vec[vcnt - 1].bv_len = resid;
896 bio->bi_io_vec[0].bv_len = len;
907 * Clone a portion of a bio chain, starting at the given byte offset
908 * into the first bio in the source chain and continuing for the
909 * number of bytes indicated. The result is another bio chain of
910 * exactly the given length, or a null pointer on error.
912 * The bio_src and offset parameters are both in-out. On entry they
913 * refer to the first source bio and the offset into that bio where
914 * the start of data to be cloned is located.
916 * On return, bio_src is updated to refer to the bio in the source
917 * chain that contains first un-cloned byte, and *offset will
918 * contain the offset of that byte within that bio.
920 static struct bio *bio_chain_clone_range(struct bio **bio_src,
921 unsigned int *offset,
925 struct bio *bi = *bio_src;
926 unsigned int off = *offset;
927 struct bio *chain = NULL;
930 /* Build up a chain of clone bios up to the limit */
932 if (!bi || off >= bi->bi_size || !len)
933 return NULL; /* Nothing to clone */
937 unsigned int bi_size;
941 goto out_err; /* EINVAL; ran out of bio's */
942 bi_size = min_t(unsigned int, bi->bi_size - off, len);
943 bio = bio_clone_range(bi, off, bi_size, gfpmask);
945 goto out_err; /* ENOMEM */
951 if (off == bi->bi_size) {
962 bio_chain_put(chain);
968 * helpers for osd request op vectors.
970 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
971 int opcode, u32 payload_len)
973 struct ceph_osd_req_op *ops;
975 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
982 * op extent offset and length will be set later on
983 * in calc_raw_layout()
985 ops[0].payload_len = payload_len;
990 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
995 static void rbd_coll_end_req_index(struct request *rq,
996 struct rbd_req_coll *coll,
1000 struct request_queue *q;
1003 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1004 coll, index, ret, (unsigned long long) len);
1010 blk_end_request(rq, ret, len);
1016 spin_lock_irq(q->queue_lock);
1017 coll->status[index].done = 1;
1018 coll->status[index].rc = ret;
1019 coll->status[index].bytes = len;
1020 max = min = coll->num_done;
1021 while (max < coll->total && coll->status[max].done)
1024 for (i = min; i<max; i++) {
1025 __blk_end_request(rq, coll->status[i].rc,
1026 coll->status[i].bytes);
1028 kref_put(&coll->kref, rbd_coll_release);
1030 spin_unlock_irq(q->queue_lock);
1033 static void rbd_coll_end_req(struct rbd_request *req,
1036 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1040 * Send ceph osd request
1042 static int rbd_do_request(struct request *rq,
1043 struct rbd_device *rbd_dev,
1044 struct ceph_snap_context *snapc,
1046 const char *object_name, u64 ofs, u64 len,
1048 struct page **pages,
1051 struct ceph_osd_req_op *ops,
1052 struct rbd_req_coll *coll,
1054 void (*rbd_cb)(struct ceph_osd_request *req,
1055 struct ceph_msg *msg),
1056 struct ceph_osd_request **linger_req,
1059 struct ceph_osd_request *req;
1060 struct ceph_file_layout *layout;
1063 struct timespec mtime = CURRENT_TIME;
1064 struct rbd_request *req_data;
1065 struct ceph_osd_request_head *reqhead;
1066 struct ceph_osd_client *osdc;
1068 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1071 rbd_coll_end_req_index(rq, coll, coll_index,
1077 req_data->coll = coll;
1078 req_data->coll_index = coll_index;
1081 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1082 object_name, (unsigned long long) ofs,
1083 (unsigned long long) len, coll, coll_index);
1085 osdc = &rbd_dev->rbd_client->client->osdc;
1086 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1087 false, GFP_NOIO, pages, bio);
1093 req->r_callback = rbd_cb;
1096 req_data->bio = bio;
1097 req_data->pages = pages;
1098 req_data->len = len;
1100 req->r_priv = req_data;
1102 reqhead = req->r_request->front.iov_base;
1103 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1105 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1106 req->r_oid_len = strlen(req->r_oid);
1108 layout = &req->r_file_layout;
1109 memset(layout, 0, sizeof(*layout));
1110 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1111 layout->fl_stripe_count = cpu_to_le32(1);
1112 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1113 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1114 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1116 rbd_assert(ret == 0);
1118 ceph_osdc_build_request(req, ofs, &len,
1122 req->r_oid, req->r_oid_len);
1125 ceph_osdc_set_request_linger(osdc, req);
1129 ret = ceph_osdc_start_request(osdc, req, false);
1134 ret = ceph_osdc_wait_request(osdc, req);
1136 *ver = le64_to_cpu(req->r_reassert_version.version);
1137 dout("reassert_ver=%llu\n",
1138 (unsigned long long)
1139 le64_to_cpu(req->r_reassert_version.version));
1140 ceph_osdc_put_request(req);
1145 bio_chain_put(req_data->bio);
1146 ceph_osdc_put_request(req);
1148 rbd_coll_end_req(req_data, ret, len);
1154 * Ceph osd op callback
1156 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1158 struct rbd_request *req_data = req->r_priv;
1159 struct ceph_osd_reply_head *replyhead;
1160 struct ceph_osd_op *op;
1166 replyhead = msg->front.iov_base;
1167 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1168 op = (void *)(replyhead + 1);
1169 rc = le32_to_cpu(replyhead->result);
1170 bytes = le64_to_cpu(op->extent.length);
1171 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1173 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1174 (unsigned long long) bytes, read_op, (int) rc);
1176 if (rc == -ENOENT && read_op) {
1177 zero_bio_chain(req_data->bio, 0);
1179 } else if (rc == 0 && read_op && bytes < req_data->len) {
1180 zero_bio_chain(req_data->bio, bytes);
1181 bytes = req_data->len;
1184 rbd_coll_end_req(req_data, rc, bytes);
1187 bio_chain_put(req_data->bio);
1189 ceph_osdc_put_request(req);
1193 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1195 ceph_osdc_put_request(req);
1199 * Do a synchronous ceph osd operation
1201 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1202 struct ceph_snap_context *snapc,
1205 struct ceph_osd_req_op *ops,
1206 const char *object_name,
1207 u64 ofs, u64 inbound_size,
1209 struct ceph_osd_request **linger_req,
1213 struct page **pages;
1216 rbd_assert(ops != NULL);
1218 num_pages = calc_pages_for(ofs, inbound_size);
1219 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1221 return PTR_ERR(pages);
1223 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1224 object_name, ofs, inbound_size, NULL,
1234 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1235 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1238 ceph_release_page_vector(pages, num_pages);
1243 * Do an asynchronous ceph osd operation
1245 static int rbd_do_op(struct request *rq,
1246 struct rbd_device *rbd_dev,
1247 struct ceph_snap_context *snapc,
1250 struct rbd_req_coll *coll,
1257 struct ceph_osd_req_op *ops;
1263 seg_name = rbd_segment_name(rbd_dev, ofs);
1266 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1267 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1269 if (rq_data_dir(rq) == WRITE) {
1270 opcode = CEPH_OSD_OP_WRITE;
1271 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1272 snapid = CEPH_NOSNAP;
1273 payload_len = seg_len;
1275 opcode = CEPH_OSD_OP_READ;
1276 flags = CEPH_OSD_FLAG_READ;
1278 snapid = rbd_dev->spec->snap_id;
1283 ops = rbd_create_rw_ops(1, opcode, payload_len);
1287 /* we've taken care of segment sizes earlier when we
1288 cloned the bios. We should never have a segment
1289 truncated at this point */
1290 rbd_assert(seg_len == len);
1292 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1293 seg_name, seg_ofs, seg_len,
1299 rbd_req_cb, 0, NULL);
1301 rbd_destroy_ops(ops);
1308 * Request sync osd read
1310 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1312 const char *object_name,
1317 struct ceph_osd_req_op *ops;
1320 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1324 ret = rbd_req_sync_op(rbd_dev, NULL,
1327 ops, object_name, ofs, len, buf, NULL, ver);
1328 rbd_destroy_ops(ops);
1334 * Request sync osd watch
1336 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1340 struct ceph_osd_req_op *ops;
1343 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1347 ops[0].watch.ver = cpu_to_le64(ver);
1348 ops[0].watch.cookie = notify_id;
1349 ops[0].watch.flag = 0;
1351 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1352 rbd_dev->header_name, 0, 0, NULL,
1357 rbd_simple_req_cb, 0, NULL);
1359 rbd_destroy_ops(ops);
1363 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1365 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1372 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1373 rbd_dev->header_name, (unsigned long long) notify_id,
1374 (unsigned int) opcode);
1375 rc = rbd_dev_refresh(rbd_dev, &hver);
1377 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1378 " update snaps: %d\n", rbd_dev->major, rc);
1380 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1384 * Request sync osd watch
1386 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1388 struct ceph_osd_req_op *ops;
1389 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1392 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1396 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1397 (void *)rbd_dev, &rbd_dev->watch_event);
1401 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1402 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1403 ops[0].watch.flag = 1;
1405 ret = rbd_req_sync_op(rbd_dev, NULL,
1407 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1409 rbd_dev->header_name,
1411 &rbd_dev->watch_request, NULL);
1416 rbd_destroy_ops(ops);
1420 ceph_osdc_cancel_event(rbd_dev->watch_event);
1421 rbd_dev->watch_event = NULL;
1423 rbd_destroy_ops(ops);
1428 * Request sync osd unwatch
1430 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1432 struct ceph_osd_req_op *ops;
1435 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1439 ops[0].watch.ver = 0;
1440 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1441 ops[0].watch.flag = 0;
1443 ret = rbd_req_sync_op(rbd_dev, NULL,
1445 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1447 rbd_dev->header_name,
1448 0, 0, NULL, NULL, NULL);
1451 rbd_destroy_ops(ops);
1452 ceph_osdc_cancel_event(rbd_dev->watch_event);
1453 rbd_dev->watch_event = NULL;
1458 * Synchronous osd object method call
1460 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1461 const char *object_name,
1462 const char *class_name,
1463 const char *method_name,
1464 const char *outbound,
1465 size_t outbound_size,
1467 size_t inbound_size,
1471 struct ceph_osd_req_op *ops;
1472 int class_name_len = strlen(class_name);
1473 int method_name_len = strlen(method_name);
1478 * Any input parameters required by the method we're calling
1479 * will be sent along with the class and method names as
1480 * part of the message payload. That data and its size are
1481 * supplied via the indata and indata_len fields (named from
1482 * the perspective of the server side) in the OSD request
1485 payload_size = class_name_len + method_name_len + outbound_size;
1486 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1490 ops[0].cls.class_name = class_name;
1491 ops[0].cls.class_len = (__u8) class_name_len;
1492 ops[0].cls.method_name = method_name;
1493 ops[0].cls.method_len = (__u8) method_name_len;
1494 ops[0].cls.argc = 0;
1495 ops[0].cls.indata = outbound;
1496 ops[0].cls.indata_len = outbound_size;
1498 ret = rbd_req_sync_op(rbd_dev, NULL,
1501 object_name, 0, inbound_size, inbound,
1504 rbd_destroy_ops(ops);
1506 dout("cls_exec returned %d\n", ret);
1510 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1512 struct rbd_req_coll *coll =
1513 kzalloc(sizeof(struct rbd_req_coll) +
1514 sizeof(struct rbd_req_status) * num_reqs,
1519 coll->total = num_reqs;
1520 kref_init(&coll->kref);
1525 * block device queue callback
1527 static void rbd_rq_fn(struct request_queue *q)
1529 struct rbd_device *rbd_dev = q->queuedata;
1532 while ((rq = blk_fetch_request(q))) {
1537 int num_segs, cur_seg = 0;
1538 struct rbd_req_coll *coll;
1539 struct ceph_snap_context *snapc;
1540 unsigned int bio_offset;
1542 dout("fetched request\n");
1544 /* filter out block requests we don't understand */
1545 if ((rq->cmd_type != REQ_TYPE_FS)) {
1546 __blk_end_request_all(rq, 0);
1550 /* deduce our operation (read, write) */
1551 do_write = (rq_data_dir(rq) == WRITE);
1552 if (do_write && rbd_dev->mapping.read_only) {
1553 __blk_end_request_all(rq, -EROFS);
1557 spin_unlock_irq(q->queue_lock);
1559 down_read(&rbd_dev->header_rwsem);
1561 if (!rbd_dev->exists) {
1562 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1563 up_read(&rbd_dev->header_rwsem);
1564 dout("request for non-existent snapshot");
1565 spin_lock_irq(q->queue_lock);
1566 __blk_end_request_all(rq, -ENXIO);
1570 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1572 up_read(&rbd_dev->header_rwsem);
1574 size = blk_rq_bytes(rq);
1575 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1578 dout("%s 0x%x bytes at 0x%llx\n",
1579 do_write ? "write" : "read",
1580 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1582 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1583 if (num_segs <= 0) {
1584 spin_lock_irq(q->queue_lock);
1585 __blk_end_request_all(rq, num_segs);
1586 ceph_put_snap_context(snapc);
1589 coll = rbd_alloc_coll(num_segs);
1591 spin_lock_irq(q->queue_lock);
1592 __blk_end_request_all(rq, -ENOMEM);
1593 ceph_put_snap_context(snapc);
1599 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1600 unsigned int chain_size;
1601 struct bio *bio_chain;
1603 BUG_ON(limit > (u64) UINT_MAX);
1604 chain_size = (unsigned int) limit;
1605 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1607 kref_get(&coll->kref);
1609 /* Pass a cloned bio chain via an osd request */
1611 bio_chain = bio_chain_clone_range(&bio,
1612 &bio_offset, chain_size,
1615 (void) rbd_do_op(rq, rbd_dev, snapc,
1617 bio_chain, coll, cur_seg);
1619 rbd_coll_end_req_index(rq, coll, cur_seg,
1620 -ENOMEM, chain_size);
1626 kref_put(&coll->kref, rbd_coll_release);
1628 spin_lock_irq(q->queue_lock);
1630 ceph_put_snap_context(snapc);
1635 * a queue callback. Makes sure that we don't create a bio that spans across
1636 * multiple osd objects. One exception would be with a single page bios,
1637 * which we handle later at bio_chain_clone_range()
1639 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1640 struct bio_vec *bvec)
1642 struct rbd_device *rbd_dev = q->queuedata;
1643 sector_t sector_offset;
1644 sector_t sectors_per_obj;
1645 sector_t obj_sector_offset;
1649 * Find how far into its rbd object the partition-relative
1650 * bio start sector is to offset relative to the enclosing
1653 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1654 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1655 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1658 * Compute the number of bytes from that offset to the end
1659 * of the object. Account for what's already used by the bio.
1661 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1662 if (ret > bmd->bi_size)
1663 ret -= bmd->bi_size;
1668 * Don't send back more than was asked for. And if the bio
1669 * was empty, let the whole thing through because: "Note
1670 * that a block device *must* allow a single page to be
1671 * added to an empty bio."
1673 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1674 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1675 ret = (int) bvec->bv_len;
1680 static void rbd_free_disk(struct rbd_device *rbd_dev)
1682 struct gendisk *disk = rbd_dev->disk;
1687 if (disk->flags & GENHD_FL_UP)
1690 blk_cleanup_queue(disk->queue);
1695 * Read the complete header for the given rbd device.
1697 * Returns a pointer to a dynamically-allocated buffer containing
1698 * the complete and validated header. Caller can pass the address
1699 * of a variable that will be filled in with the version of the
1700 * header object at the time it was read.
1702 * Returns a pointer-coded errno if a failure occurs.
1704 static struct rbd_image_header_ondisk *
1705 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1707 struct rbd_image_header_ondisk *ondisk = NULL;
1714 * The complete header will include an array of its 64-bit
1715 * snapshot ids, followed by the names of those snapshots as
1716 * a contiguous block of NUL-terminated strings. Note that
1717 * the number of snapshots could change by the time we read
1718 * it in, in which case we re-read it.
1725 size = sizeof (*ondisk);
1726 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1728 ondisk = kmalloc(size, GFP_KERNEL);
1730 return ERR_PTR(-ENOMEM);
1732 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1733 rbd_dev->header_name,
1735 (char *) ondisk, version);
1739 if (WARN_ON((size_t) ret < size)) {
1741 pr_warning("short header read for image %s"
1742 " (want %zd got %d)\n",
1743 rbd_dev->spec->image_name, size, ret);
1746 if (!rbd_dev_ondisk_valid(ondisk)) {
1748 pr_warning("invalid header for image %s\n",
1749 rbd_dev->spec->image_name);
1753 names_size = le64_to_cpu(ondisk->snap_names_len);
1754 want_count = snap_count;
1755 snap_count = le32_to_cpu(ondisk->snap_count);
1756 } while (snap_count != want_count);
1763 return ERR_PTR(ret);
1767 * reload the ondisk the header
1769 static int rbd_read_header(struct rbd_device *rbd_dev,
1770 struct rbd_image_header *header)
1772 struct rbd_image_header_ondisk *ondisk;
1776 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1778 return PTR_ERR(ondisk);
1779 ret = rbd_header_from_disk(header, ondisk);
1781 header->obj_version = ver;
1787 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1789 struct rbd_snap *snap;
1790 struct rbd_snap *next;
1792 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1793 rbd_remove_snap_dev(snap);
1796 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1800 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1803 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1804 dout("setting size to %llu sectors", (unsigned long long) size);
1805 rbd_dev->mapping.size = (u64) size;
1806 set_capacity(rbd_dev->disk, size);
1810 * only read the first part of the ondisk header, without the snaps info
1812 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1815 struct rbd_image_header h;
1817 ret = rbd_read_header(rbd_dev, &h);
1821 down_write(&rbd_dev->header_rwsem);
1823 /* Update image size, and check for resize of mapped image */
1824 rbd_dev->header.image_size = h.image_size;
1825 rbd_update_mapping_size(rbd_dev);
1827 /* rbd_dev->header.object_prefix shouldn't change */
1828 kfree(rbd_dev->header.snap_sizes);
1829 kfree(rbd_dev->header.snap_names);
1830 /* osd requests may still refer to snapc */
1831 ceph_put_snap_context(rbd_dev->header.snapc);
1834 *hver = h.obj_version;
1835 rbd_dev->header.obj_version = h.obj_version;
1836 rbd_dev->header.image_size = h.image_size;
1837 rbd_dev->header.snapc = h.snapc;
1838 rbd_dev->header.snap_names = h.snap_names;
1839 rbd_dev->header.snap_sizes = h.snap_sizes;
1840 /* Free the extra copy of the object prefix */
1841 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1842 kfree(h.object_prefix);
1844 ret = rbd_dev_snaps_update(rbd_dev);
1846 ret = rbd_dev_snaps_register(rbd_dev);
1848 up_write(&rbd_dev->header_rwsem);
1853 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1857 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1858 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1859 if (rbd_dev->image_format == 1)
1860 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1862 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1863 mutex_unlock(&ctl_mutex);
1868 static int rbd_init_disk(struct rbd_device *rbd_dev)
1870 struct gendisk *disk;
1871 struct request_queue *q;
1874 /* create gendisk info */
1875 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1879 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1881 disk->major = rbd_dev->major;
1882 disk->first_minor = 0;
1883 disk->fops = &rbd_bd_ops;
1884 disk->private_data = rbd_dev;
1887 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1891 /* We use the default size, but let's be explicit about it. */
1892 blk_queue_physical_block_size(q, SECTOR_SIZE);
1894 /* set io sizes to object size */
1895 segment_size = rbd_obj_bytes(&rbd_dev->header);
1896 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1897 blk_queue_max_segment_size(q, segment_size);
1898 blk_queue_io_min(q, segment_size);
1899 blk_queue_io_opt(q, segment_size);
1901 blk_queue_merge_bvec(q, rbd_merge_bvec);
1904 q->queuedata = rbd_dev;
1906 rbd_dev->disk = disk;
1908 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1921 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1923 return container_of(dev, struct rbd_device, dev);
1926 static ssize_t rbd_size_show(struct device *dev,
1927 struct device_attribute *attr, char *buf)
1929 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1932 down_read(&rbd_dev->header_rwsem);
1933 size = get_capacity(rbd_dev->disk);
1934 up_read(&rbd_dev->header_rwsem);
1936 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1940 * Note this shows the features for whatever's mapped, which is not
1941 * necessarily the base image.
1943 static ssize_t rbd_features_show(struct device *dev,
1944 struct device_attribute *attr, char *buf)
1946 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1948 return sprintf(buf, "0x%016llx\n",
1949 (unsigned long long) rbd_dev->mapping.features);
1952 static ssize_t rbd_major_show(struct device *dev,
1953 struct device_attribute *attr, char *buf)
1955 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1957 return sprintf(buf, "%d\n", rbd_dev->major);
1960 static ssize_t rbd_client_id_show(struct device *dev,
1961 struct device_attribute *attr, char *buf)
1963 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1965 return sprintf(buf, "client%lld\n",
1966 ceph_client_id(rbd_dev->rbd_client->client));
1969 static ssize_t rbd_pool_show(struct device *dev,
1970 struct device_attribute *attr, char *buf)
1972 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1974 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1977 static ssize_t rbd_pool_id_show(struct device *dev,
1978 struct device_attribute *attr, char *buf)
1980 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1982 return sprintf(buf, "%llu\n",
1983 (unsigned long long) rbd_dev->spec->pool_id);
1986 static ssize_t rbd_name_show(struct device *dev,
1987 struct device_attribute *attr, char *buf)
1989 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1991 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
1994 static ssize_t rbd_image_id_show(struct device *dev,
1995 struct device_attribute *attr, char *buf)
1997 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1999 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2003 * Shows the name of the currently-mapped snapshot (or
2004 * RBD_SNAP_HEAD_NAME for the base image).
2006 static ssize_t rbd_snap_show(struct device *dev,
2007 struct device_attribute *attr,
2010 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2012 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2015 static ssize_t rbd_image_refresh(struct device *dev,
2016 struct device_attribute *attr,
2020 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2023 ret = rbd_dev_refresh(rbd_dev, NULL);
2025 return ret < 0 ? ret : size;
2028 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2029 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2030 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2031 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2032 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2033 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2034 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2035 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2036 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2037 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2039 static struct attribute *rbd_attrs[] = {
2040 &dev_attr_size.attr,
2041 &dev_attr_features.attr,
2042 &dev_attr_major.attr,
2043 &dev_attr_client_id.attr,
2044 &dev_attr_pool.attr,
2045 &dev_attr_pool_id.attr,
2046 &dev_attr_name.attr,
2047 &dev_attr_image_id.attr,
2048 &dev_attr_current_snap.attr,
2049 &dev_attr_refresh.attr,
2053 static struct attribute_group rbd_attr_group = {
2057 static const struct attribute_group *rbd_attr_groups[] = {
2062 static void rbd_sysfs_dev_release(struct device *dev)
2066 static struct device_type rbd_device_type = {
2068 .groups = rbd_attr_groups,
2069 .release = rbd_sysfs_dev_release,
2077 static ssize_t rbd_snap_size_show(struct device *dev,
2078 struct device_attribute *attr,
2081 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2083 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2086 static ssize_t rbd_snap_id_show(struct device *dev,
2087 struct device_attribute *attr,
2090 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2092 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2095 static ssize_t rbd_snap_features_show(struct device *dev,
2096 struct device_attribute *attr,
2099 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2101 return sprintf(buf, "0x%016llx\n",
2102 (unsigned long long) snap->features);
2105 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2106 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2107 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2109 static struct attribute *rbd_snap_attrs[] = {
2110 &dev_attr_snap_size.attr,
2111 &dev_attr_snap_id.attr,
2112 &dev_attr_snap_features.attr,
2116 static struct attribute_group rbd_snap_attr_group = {
2117 .attrs = rbd_snap_attrs,
2120 static void rbd_snap_dev_release(struct device *dev)
2122 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2127 static const struct attribute_group *rbd_snap_attr_groups[] = {
2128 &rbd_snap_attr_group,
2132 static struct device_type rbd_snap_device_type = {
2133 .groups = rbd_snap_attr_groups,
2134 .release = rbd_snap_dev_release,
2137 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2139 kref_get(&spec->kref);
2144 static void rbd_spec_free(struct kref *kref);
2145 static void rbd_spec_put(struct rbd_spec *spec)
2148 kref_put(&spec->kref, rbd_spec_free);
2151 static struct rbd_spec *rbd_spec_alloc(void)
2153 struct rbd_spec *spec;
2155 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2158 kref_init(&spec->kref);
2160 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2165 static void rbd_spec_free(struct kref *kref)
2167 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2169 kfree(spec->pool_name);
2170 kfree(spec->image_id);
2171 kfree(spec->image_name);
2172 kfree(spec->snap_name);
2176 static bool rbd_snap_registered(struct rbd_snap *snap)
2178 bool ret = snap->dev.type == &rbd_snap_device_type;
2179 bool reg = device_is_registered(&snap->dev);
2181 rbd_assert(!ret ^ reg);
2186 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2188 list_del(&snap->node);
2189 if (device_is_registered(&snap->dev))
2190 device_unregister(&snap->dev);
2193 static int rbd_register_snap_dev(struct rbd_snap *snap,
2194 struct device *parent)
2196 struct device *dev = &snap->dev;
2199 dev->type = &rbd_snap_device_type;
2200 dev->parent = parent;
2201 dev->release = rbd_snap_dev_release;
2202 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2203 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2205 ret = device_register(dev);
2210 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2211 const char *snap_name,
2212 u64 snap_id, u64 snap_size,
2215 struct rbd_snap *snap;
2218 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2220 return ERR_PTR(-ENOMEM);
2223 snap->name = kstrdup(snap_name, GFP_KERNEL);
2228 snap->size = snap_size;
2229 snap->features = snap_features;
2237 return ERR_PTR(ret);
2240 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2241 u64 *snap_size, u64 *snap_features)
2245 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2247 *snap_size = rbd_dev->header.snap_sizes[which];
2248 *snap_features = 0; /* No features for v1 */
2250 /* Skip over names until we find the one we are looking for */
2252 snap_name = rbd_dev->header.snap_names;
2254 snap_name += strlen(snap_name) + 1;
2260 * Get the size and object order for an image snapshot, or if
2261 * snap_id is CEPH_NOSNAP, gets this information for the base
2264 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2265 u8 *order, u64 *snap_size)
2267 __le64 snapid = cpu_to_le64(snap_id);
2272 } __attribute__ ((packed)) size_buf = { 0 };
2274 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2276 (char *) &snapid, sizeof (snapid),
2277 (char *) &size_buf, sizeof (size_buf),
2278 CEPH_OSD_FLAG_READ, NULL);
2279 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2283 *order = size_buf.order;
2284 *snap_size = le64_to_cpu(size_buf.size);
2286 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2287 (unsigned long long) snap_id, (unsigned int) *order,
2288 (unsigned long long) *snap_size);
2293 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2295 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2296 &rbd_dev->header.obj_order,
2297 &rbd_dev->header.image_size);
2300 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2306 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2310 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2311 "rbd", "get_object_prefix",
2313 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2314 CEPH_OSD_FLAG_READ, NULL);
2315 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2318 ret = 0; /* rbd_req_sync_exec() can return positive */
2321 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2322 p + RBD_OBJ_PREFIX_LEN_MAX,
2325 if (IS_ERR(rbd_dev->header.object_prefix)) {
2326 ret = PTR_ERR(rbd_dev->header.object_prefix);
2327 rbd_dev->header.object_prefix = NULL;
2329 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2338 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2341 __le64 snapid = cpu_to_le64(snap_id);
2345 } features_buf = { 0 };
2349 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2350 "rbd", "get_features",
2351 (char *) &snapid, sizeof (snapid),
2352 (char *) &features_buf, sizeof (features_buf),
2353 CEPH_OSD_FLAG_READ, NULL);
2354 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2358 incompat = le64_to_cpu(features_buf.incompat);
2359 if (incompat & ~RBD_FEATURES_ALL)
2362 *snap_features = le64_to_cpu(features_buf.features);
2364 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2365 (unsigned long long) snap_id,
2366 (unsigned long long) *snap_features,
2367 (unsigned long long) le64_to_cpu(features_buf.incompat));
2372 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2374 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2375 &rbd_dev->header.features);
2378 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2387 struct ceph_snap_context *snapc;
2391 * We'll need room for the seq value (maximum snapshot id),
2392 * snapshot count, and array of that many snapshot ids.
2393 * For now we have a fixed upper limit on the number we're
2394 * prepared to receive.
2396 size = sizeof (__le64) + sizeof (__le32) +
2397 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2398 reply_buf = kzalloc(size, GFP_KERNEL);
2402 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2403 "rbd", "get_snapcontext",
2406 CEPH_OSD_FLAG_READ, ver);
2407 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2413 end = (char *) reply_buf + size;
2414 ceph_decode_64_safe(&p, end, seq, out);
2415 ceph_decode_32_safe(&p, end, snap_count, out);
2418 * Make sure the reported number of snapshot ids wouldn't go
2419 * beyond the end of our buffer. But before checking that,
2420 * make sure the computed size of the snapshot context we
2421 * allocate is representable in a size_t.
2423 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2428 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2431 size = sizeof (struct ceph_snap_context) +
2432 snap_count * sizeof (snapc->snaps[0]);
2433 snapc = kmalloc(size, GFP_KERNEL);
2439 atomic_set(&snapc->nref, 1);
2441 snapc->num_snaps = snap_count;
2442 for (i = 0; i < snap_count; i++)
2443 snapc->snaps[i] = ceph_decode_64(&p);
2445 rbd_dev->header.snapc = snapc;
2447 dout(" snap context seq = %llu, snap_count = %u\n",
2448 (unsigned long long) seq, (unsigned int) snap_count);
2456 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2466 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2467 reply_buf = kmalloc(size, GFP_KERNEL);
2469 return ERR_PTR(-ENOMEM);
2471 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2472 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2473 "rbd", "get_snapshot_name",
2474 (char *) &snap_id, sizeof (snap_id),
2476 CEPH_OSD_FLAG_READ, NULL);
2477 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2482 end = (char *) reply_buf + size;
2483 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2484 if (IS_ERR(snap_name)) {
2485 ret = PTR_ERR(snap_name);
2488 dout(" snap_id 0x%016llx snap_name = %s\n",
2489 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2497 return ERR_PTR(ret);
2500 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2501 u64 *snap_size, u64 *snap_features)
2507 snap_id = rbd_dev->header.snapc->snaps[which];
2508 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2510 return ERR_PTR(ret);
2511 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2513 return ERR_PTR(ret);
2515 return rbd_dev_v2_snap_name(rbd_dev, which);
2518 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2519 u64 *snap_size, u64 *snap_features)
2521 if (rbd_dev->image_format == 1)
2522 return rbd_dev_v1_snap_info(rbd_dev, which,
2523 snap_size, snap_features);
2524 if (rbd_dev->image_format == 2)
2525 return rbd_dev_v2_snap_info(rbd_dev, which,
2526 snap_size, snap_features);
2527 return ERR_PTR(-EINVAL);
2530 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2535 down_write(&rbd_dev->header_rwsem);
2537 /* Grab old order first, to see if it changes */
2539 obj_order = rbd_dev->header.obj_order,
2540 ret = rbd_dev_v2_image_size(rbd_dev);
2543 if (rbd_dev->header.obj_order != obj_order) {
2547 rbd_update_mapping_size(rbd_dev);
2549 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2550 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2553 ret = rbd_dev_snaps_update(rbd_dev);
2554 dout("rbd_dev_snaps_update returned %d\n", ret);
2557 ret = rbd_dev_snaps_register(rbd_dev);
2558 dout("rbd_dev_snaps_register returned %d\n", ret);
2560 up_write(&rbd_dev->header_rwsem);
2566 * Scan the rbd device's current snapshot list and compare it to the
2567 * newly-received snapshot context. Remove any existing snapshots
2568 * not present in the new snapshot context. Add a new snapshot for
2569 * any snaphots in the snapshot context not in the current list.
2570 * And verify there are no changes to snapshots we already know
2573 * Assumes the snapshots in the snapshot context are sorted by
2574 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2575 * are also maintained in that order.)
2577 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2579 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2580 const u32 snap_count = snapc->num_snaps;
2581 struct list_head *head = &rbd_dev->snaps;
2582 struct list_head *links = head->next;
2585 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2586 while (index < snap_count || links != head) {
2588 struct rbd_snap *snap;
2591 u64 snap_features = 0;
2593 snap_id = index < snap_count ? snapc->snaps[index]
2595 snap = links != head ? list_entry(links, struct rbd_snap, node)
2597 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2599 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2600 struct list_head *next = links->next;
2602 /* Existing snapshot not in the new snap context */
2604 if (rbd_dev->spec->snap_id == snap->id)
2605 rbd_dev->exists = false;
2606 rbd_remove_snap_dev(snap);
2607 dout("%ssnap id %llu has been removed\n",
2608 rbd_dev->spec->snap_id == snap->id ?
2610 (unsigned long long) snap->id);
2612 /* Done with this list entry; advance */
2618 snap_name = rbd_dev_snap_info(rbd_dev, index,
2619 &snap_size, &snap_features);
2620 if (IS_ERR(snap_name))
2621 return PTR_ERR(snap_name);
2623 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2624 (unsigned long long) snap_id);
2625 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2626 struct rbd_snap *new_snap;
2628 /* We haven't seen this snapshot before */
2630 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2631 snap_id, snap_size, snap_features);
2632 if (IS_ERR(new_snap)) {
2633 int err = PTR_ERR(new_snap);
2635 dout(" failed to add dev, error %d\n", err);
2640 /* New goes before existing, or at end of list */
2642 dout(" added dev%s\n", snap ? "" : " at end\n");
2644 list_add_tail(&new_snap->node, &snap->node);
2646 list_add_tail(&new_snap->node, head);
2648 /* Already have this one */
2650 dout(" already present\n");
2652 rbd_assert(snap->size == snap_size);
2653 rbd_assert(!strcmp(snap->name, snap_name));
2654 rbd_assert(snap->features == snap_features);
2656 /* Done with this list entry; advance */
2658 links = links->next;
2661 /* Advance to the next entry in the snapshot context */
2665 dout("%s: done\n", __func__);
2671 * Scan the list of snapshots and register the devices for any that
2672 * have not already been registered.
2674 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2676 struct rbd_snap *snap;
2679 dout("%s called\n", __func__);
2680 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2683 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2684 if (!rbd_snap_registered(snap)) {
2685 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2690 dout("%s: returning %d\n", __func__, ret);
2695 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2700 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2702 dev = &rbd_dev->dev;
2703 dev->bus = &rbd_bus_type;
2704 dev->type = &rbd_device_type;
2705 dev->parent = &rbd_root_dev;
2706 dev->release = rbd_dev_release;
2707 dev_set_name(dev, "%d", rbd_dev->dev_id);
2708 ret = device_register(dev);
2710 mutex_unlock(&ctl_mutex);
2715 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2717 device_unregister(&rbd_dev->dev);
2720 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2725 ret = rbd_req_sync_watch(rbd_dev);
2726 if (ret == -ERANGE) {
2727 rc = rbd_dev_refresh(rbd_dev, NULL);
2731 } while (ret == -ERANGE);
2736 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2739 * Get a unique rbd identifier for the given new rbd_dev, and add
2740 * the rbd_dev to the global list. The minimum rbd id is 1.
2742 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2744 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2746 spin_lock(&rbd_dev_list_lock);
2747 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2748 spin_unlock(&rbd_dev_list_lock);
2749 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2750 (unsigned long long) rbd_dev->dev_id);
2754 * Remove an rbd_dev from the global list, and record that its
2755 * identifier is no longer in use.
2757 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2759 struct list_head *tmp;
2760 int rbd_id = rbd_dev->dev_id;
2763 rbd_assert(rbd_id > 0);
2765 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2766 (unsigned long long) rbd_dev->dev_id);
2767 spin_lock(&rbd_dev_list_lock);
2768 list_del_init(&rbd_dev->node);
2771 * If the id being "put" is not the current maximum, there
2772 * is nothing special we need to do.
2774 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2775 spin_unlock(&rbd_dev_list_lock);
2780 * We need to update the current maximum id. Search the
2781 * list to find out what it is. We're more likely to find
2782 * the maximum at the end, so search the list backward.
2785 list_for_each_prev(tmp, &rbd_dev_list) {
2786 struct rbd_device *rbd_dev;
2788 rbd_dev = list_entry(tmp, struct rbd_device, node);
2789 if (rbd_dev->dev_id > max_id)
2790 max_id = rbd_dev->dev_id;
2792 spin_unlock(&rbd_dev_list_lock);
2795 * The max id could have been updated by rbd_dev_id_get(), in
2796 * which case it now accurately reflects the new maximum.
2797 * Be careful not to overwrite the maximum value in that
2800 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2801 dout(" max dev id has been reset\n");
2805 * Skips over white space at *buf, and updates *buf to point to the
2806 * first found non-space character (if any). Returns the length of
2807 * the token (string of non-white space characters) found. Note
2808 * that *buf must be terminated with '\0'.
2810 static inline size_t next_token(const char **buf)
2813 * These are the characters that produce nonzero for
2814 * isspace() in the "C" and "POSIX" locales.
2816 const char *spaces = " \f\n\r\t\v";
2818 *buf += strspn(*buf, spaces); /* Find start of token */
2820 return strcspn(*buf, spaces); /* Return token length */
2824 * Finds the next token in *buf, and if the provided token buffer is
2825 * big enough, copies the found token into it. The result, if
2826 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2827 * must be terminated with '\0' on entry.
2829 * Returns the length of the token found (not including the '\0').
2830 * Return value will be 0 if no token is found, and it will be >=
2831 * token_size if the token would not fit.
2833 * The *buf pointer will be updated to point beyond the end of the
2834 * found token. Note that this occurs even if the token buffer is
2835 * too small to hold it.
2837 static inline size_t copy_token(const char **buf,
2843 len = next_token(buf);
2844 if (len < token_size) {
2845 memcpy(token, *buf, len);
2846 *(token + len) = '\0';
2854 * Finds the next token in *buf, dynamically allocates a buffer big
2855 * enough to hold a copy of it, and copies the token into the new
2856 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2857 * that a duplicate buffer is created even for a zero-length token.
2859 * Returns a pointer to the newly-allocated duplicate, or a null
2860 * pointer if memory for the duplicate was not available. If
2861 * the lenp argument is a non-null pointer, the length of the token
2862 * (not including the '\0') is returned in *lenp.
2864 * If successful, the *buf pointer will be updated to point beyond
2865 * the end of the found token.
2867 * Note: uses GFP_KERNEL for allocation.
2869 static inline char *dup_token(const char **buf, size_t *lenp)
2874 len = next_token(buf);
2875 dup = kmalloc(len + 1, GFP_KERNEL);
2879 memcpy(dup, *buf, len);
2880 *(dup + len) = '\0';
2890 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2891 * rbd_md_name, and name fields of the given rbd_dev, based on the
2892 * list of monitor addresses and other options provided via
2893 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2894 * copy of the snapshot name to map if successful, or a
2895 * pointer-coded error otherwise.
2897 * Note: rbd_dev is assumed to have been initially zero-filled.
2899 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2901 struct ceph_options **ceph_opts,
2902 struct rbd_options **opts)
2905 const char *mon_addrs;
2906 size_t mon_addrs_size;
2908 struct rbd_options *rbd_opts = NULL;
2911 /* The first four tokens are required */
2913 len = next_token(&buf);
2915 return -EINVAL; /* Missing monitor address(es) */
2917 mon_addrs_size = len + 1;
2921 options = dup_token(&buf, NULL);
2925 goto out_err; /* Missing options */
2927 rbd_dev->spec->pool_name = dup_token(&buf, NULL);
2928 if (!rbd_dev->spec->pool_name)
2930 if (!*rbd_dev->spec->pool_name)
2931 goto out_err; /* Missing pool name */
2933 rbd_dev->spec->image_name =
2934 dup_token(&buf, &rbd_dev->spec->image_name_len);
2935 if (!rbd_dev->spec->image_name)
2937 if (!*rbd_dev->spec->image_name)
2938 goto out_err; /* Missing image name */
2941 * Snapshot name is optional; default is to use "-"
2942 * (indicating the head/no snapshot).
2944 len = next_token(&buf);
2946 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2947 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2948 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
2949 ret = -ENAMETOOLONG;
2952 rbd_dev->spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
2953 if (!rbd_dev->spec->snap_name)
2955 memcpy(rbd_dev->spec->snap_name, buf, len);
2956 *(rbd_dev->spec->snap_name + len) = '\0';
2958 /* Initialize all rbd options to the defaults */
2960 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
2964 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
2966 *ceph_opts = ceph_parse_options(options, mon_addrs,
2967 mon_addrs + mon_addrs_size - 1,
2968 parse_rbd_opts_token, rbd_opts);
2970 if (IS_ERR(*ceph_opts)) {
2971 ret = PTR_ERR(*ceph_opts);
2980 kfree(rbd_dev->spec->image_name);
2981 rbd_dev->spec->image_name = NULL;
2982 rbd_dev->spec->image_name_len = 0;
2983 kfree(rbd_dev->spec->pool_name);
2984 rbd_dev->spec->pool_name = NULL;
2991 * An rbd format 2 image has a unique identifier, distinct from the
2992 * name given to it by the user. Internally, that identifier is
2993 * what's used to specify the names of objects related to the image.
2995 * A special "rbd id" object is used to map an rbd image name to its
2996 * id. If that object doesn't exist, then there is no v2 rbd image
2997 * with the supplied name.
2999 * This function will record the given rbd_dev's image_id field if
3000 * it can be determined, and in that case will return 0. If any
3001 * errors occur a negative errno will be returned and the rbd_dev's
3002 * image_id field will be unchanged (and should be NULL).
3004 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3013 * First, see if the format 2 image id file exists, and if
3014 * so, get the image's persistent id from it.
3016 size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
3017 object_name = kmalloc(size, GFP_NOIO);
3020 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3021 dout("rbd id object name is %s\n", object_name);
3023 /* Response will be an encoded string, which includes a length */
3025 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3026 response = kzalloc(size, GFP_NOIO);
3032 ret = rbd_req_sync_exec(rbd_dev, object_name,
3035 response, RBD_IMAGE_ID_LEN_MAX,
3036 CEPH_OSD_FLAG_READ, NULL);
3037 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3040 ret = 0; /* rbd_req_sync_exec() can return positive */
3043 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3044 p + RBD_IMAGE_ID_LEN_MAX,
3045 &rbd_dev->spec->image_id_len,
3047 if (IS_ERR(rbd_dev->spec->image_id)) {
3048 ret = PTR_ERR(rbd_dev->spec->image_id);
3049 rbd_dev->spec->image_id = NULL;
3051 dout("image_id is %s\n", rbd_dev->spec->image_id);
3060 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3065 /* Version 1 images have no id; empty string is used */
3067 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3068 if (!rbd_dev->spec->image_id)
3070 rbd_dev->spec->image_id_len = 0;
3072 /* Record the header object name for this rbd image. */
3074 size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
3075 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3076 if (!rbd_dev->header_name) {
3080 sprintf(rbd_dev->header_name, "%s%s",
3081 rbd_dev->spec->image_name, RBD_SUFFIX);
3083 /* Populate rbd image metadata */
3085 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3088 rbd_dev->image_format = 1;
3090 dout("discovered version 1 image, header name is %s\n",
3091 rbd_dev->header_name);
3096 kfree(rbd_dev->header_name);
3097 rbd_dev->header_name = NULL;
3098 kfree(rbd_dev->spec->image_id);
3099 rbd_dev->spec->image_id = NULL;
3104 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3111 * Image id was filled in by the caller. Record the header
3112 * object name for this rbd image.
3114 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
3115 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3116 if (!rbd_dev->header_name)
3118 sprintf(rbd_dev->header_name, "%s%s",
3119 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3121 /* Get the size and object order for the image */
3123 ret = rbd_dev_v2_image_size(rbd_dev);
3127 /* Get the object prefix (a.k.a. block_name) for the image */
3129 ret = rbd_dev_v2_object_prefix(rbd_dev);
3133 /* Get the and check features for the image */
3135 ret = rbd_dev_v2_features(rbd_dev);
3139 /* crypto and compression type aren't (yet) supported for v2 images */
3141 rbd_dev->header.crypt_type = 0;
3142 rbd_dev->header.comp_type = 0;
3144 /* Get the snapshot context, plus the header version */
3146 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3149 rbd_dev->header.obj_version = ver;
3151 rbd_dev->image_format = 2;
3153 dout("discovered version 2 image, header name is %s\n",
3154 rbd_dev->header_name);
3158 kfree(rbd_dev->header_name);
3159 rbd_dev->header_name = NULL;
3160 kfree(rbd_dev->header.object_prefix);
3161 rbd_dev->header.object_prefix = NULL;
3167 * Probe for the existence of the header object for the given rbd
3168 * device. For format 2 images this includes determining the image
3171 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3176 * Get the id from the image id object. If it's not a
3177 * format 2 image, we'll get ENOENT back, and we'll assume
3178 * it's a format 1 image.
3180 ret = rbd_dev_image_id(rbd_dev);
3182 ret = rbd_dev_v1_probe(rbd_dev);
3184 ret = rbd_dev_v2_probe(rbd_dev);
3186 dout("probe failed, returning %d\n", ret);
3191 static ssize_t rbd_add(struct bus_type *bus,
3195 struct rbd_device *rbd_dev = NULL;
3196 struct ceph_options *ceph_opts = NULL;
3197 struct rbd_options *rbd_opts = NULL;
3198 struct ceph_osd_client *osdc;
3201 if (!try_module_get(THIS_MODULE))
3204 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3207 rbd_dev->spec = rbd_spec_alloc();
3211 /* static rbd_device initialization */
3212 spin_lock_init(&rbd_dev->lock);
3213 INIT_LIST_HEAD(&rbd_dev->node);
3214 INIT_LIST_HEAD(&rbd_dev->snaps);
3215 init_rwsem(&rbd_dev->header_rwsem);
3217 /* parse add command */
3218 rc = rbd_add_parse_args(rbd_dev, buf, &ceph_opts, &rbd_opts);
3221 rbd_dev->mapping.read_only = rbd_opts->read_only;
3223 rc = rbd_get_client(rbd_dev, ceph_opts);
3226 ceph_opts = NULL; /* ceph_opts now owned by rbd_dev client */
3229 osdc = &rbd_dev->rbd_client->client->osdc;
3230 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->spec->pool_name);
3232 goto err_out_client;
3233 rbd_dev->spec->pool_id = (u64) rc;
3235 rc = rbd_dev_probe(rbd_dev);
3237 goto err_out_client;
3239 /* no need to lock here, as rbd_dev is not registered yet */
3240 rc = rbd_dev_snaps_update(rbd_dev);
3244 rc = rbd_dev_set_mapping(rbd_dev);
3248 /* generate unique id: find highest unique id, add one */
3249 rbd_dev_id_get(rbd_dev);
3251 /* Fill in the device name, now that we have its id. */
3252 BUILD_BUG_ON(DEV_NAME_LEN
3253 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3254 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3256 /* Get our block major device number. */
3258 rc = register_blkdev(0, rbd_dev->name);
3261 rbd_dev->major = rc;
3263 /* Set up the blkdev mapping. */
3265 rc = rbd_init_disk(rbd_dev);
3267 goto err_out_blkdev;
3269 rc = rbd_bus_add_dev(rbd_dev);
3274 * At this point cleanup in the event of an error is the job
3275 * of the sysfs code (initiated by rbd_bus_del_dev()).
3278 down_write(&rbd_dev->header_rwsem);
3279 rc = rbd_dev_snaps_register(rbd_dev);
3280 up_write(&rbd_dev->header_rwsem);
3284 rc = rbd_init_watch_dev(rbd_dev);
3290 /* Everything's ready. Announce the disk to the world. */
3292 add_disk(rbd_dev->disk);
3294 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3295 (unsigned long long) rbd_dev->mapping.size);
3300 /* this will also clean up rest of rbd_dev stuff */
3302 rbd_bus_del_dev(rbd_dev);
3308 rbd_free_disk(rbd_dev);
3310 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3312 rbd_dev_id_put(rbd_dev);
3314 rbd_remove_all_snaps(rbd_dev);
3316 rbd_header_free(&rbd_dev->header);
3318 kfree(rbd_dev->header_name);
3319 rbd_put_client(rbd_dev);
3322 ceph_destroy_options(ceph_opts);
3325 rbd_spec_put(rbd_dev->spec);
3328 dout("Error adding device %s\n", buf);
3329 module_put(THIS_MODULE);
3331 return (ssize_t) rc;
3334 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3336 struct list_head *tmp;
3337 struct rbd_device *rbd_dev;
3339 spin_lock(&rbd_dev_list_lock);
3340 list_for_each(tmp, &rbd_dev_list) {
3341 rbd_dev = list_entry(tmp, struct rbd_device, node);
3342 if (rbd_dev->dev_id == dev_id) {
3343 spin_unlock(&rbd_dev_list_lock);
3347 spin_unlock(&rbd_dev_list_lock);
3351 static void rbd_dev_release(struct device *dev)
3353 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3355 if (rbd_dev->watch_request) {
3356 struct ceph_client *client = rbd_dev->rbd_client->client;
3358 ceph_osdc_unregister_linger_request(&client->osdc,
3359 rbd_dev->watch_request);
3361 if (rbd_dev->watch_event)
3362 rbd_req_sync_unwatch(rbd_dev);
3364 rbd_put_client(rbd_dev);
3366 /* clean up and free blkdev */
3367 rbd_free_disk(rbd_dev);
3368 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3370 /* release allocated disk header fields */
3371 rbd_header_free(&rbd_dev->header);
3373 /* done with the id, and with the rbd_dev */
3374 kfree(rbd_dev->header_name);
3375 rbd_dev_id_put(rbd_dev);
3376 rbd_spec_put(rbd_dev->spec);
3379 /* release module ref */
3380 module_put(THIS_MODULE);
3383 static ssize_t rbd_remove(struct bus_type *bus,
3387 struct rbd_device *rbd_dev = NULL;
3392 rc = strict_strtoul(buf, 10, &ul);
3396 /* convert to int; abort if we lost anything in the conversion */
3397 target_id = (int) ul;
3398 if (target_id != ul)
3401 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3403 rbd_dev = __rbd_get_dev(target_id);
3409 rbd_remove_all_snaps(rbd_dev);
3410 rbd_bus_del_dev(rbd_dev);
3413 mutex_unlock(&ctl_mutex);
3419 * create control files in sysfs
3422 static int rbd_sysfs_init(void)
3426 ret = device_register(&rbd_root_dev);
3430 ret = bus_register(&rbd_bus_type);
3432 device_unregister(&rbd_root_dev);
3437 static void rbd_sysfs_cleanup(void)
3439 bus_unregister(&rbd_bus_type);
3440 device_unregister(&rbd_root_dev);
3443 int __init rbd_init(void)
3447 rc = rbd_sysfs_init();
3450 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3454 void __exit rbd_exit(void)
3456 rbd_sysfs_cleanup();
3459 module_init(rbd_init);
3460 module_exit(rbd_exit);
3462 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3463 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3464 MODULE_DESCRIPTION("rados block device");
3466 /* following authorship retained from original osdblk.c */
3467 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3469 MODULE_LICENSE("GPL");