2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
71 #define RBD_SNAP_HEAD_NAME "-"
73 #define RBD_IMAGE_ID_LEN_MAX 64
74 #define RBD_OBJ_PREFIX_LEN_MAX 64
78 #define RBD_FEATURE_LAYERING 1
80 /* Features supported by this (client software) implementation. */
82 #define RBD_FEATURES_ALL (0)
85 * An RBD device name will be "rbd#", where the "rbd" comes from
86 * RBD_DRV_NAME above, and # is a unique integer identifier.
87 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88 * enough to hold all possible device names.
90 #define DEV_NAME_LEN 32
91 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
93 #define RBD_READ_ONLY_DEFAULT false
96 * block device image metadata (in-memory version)
98 struct rbd_image_header {
99 /* These four fields never change for a given rbd image */
106 /* The remaining fields need to be updated occasionally */
108 struct ceph_snap_context *snapc;
120 * an instance of the client. multiple devices may share an rbd client.
123 struct ceph_client *client;
125 struct list_head node;
129 * a request completion status
131 struct rbd_req_status {
138 * a collection of requests
140 struct rbd_req_coll {
144 struct rbd_req_status status[0];
148 * a single io request
151 struct request *rq; /* blk layer request */
152 struct bio *bio; /* cloned bio */
153 struct page **pages; /* list of used pages */
156 struct rbd_req_coll *coll;
163 struct list_head node;
178 int dev_id; /* blkdev unique id */
180 int major; /* blkdev assigned major */
181 struct gendisk *disk; /* blkdev's gendisk and rq */
183 u32 image_format; /* Either 1 or 2 */
184 struct rbd_client *rbd_client;
186 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
188 spinlock_t lock; /* queue lock */
190 struct rbd_image_header header;
195 size_t image_name_len;
203 struct ceph_osd_event *watch_event;
204 struct ceph_osd_request *watch_request;
206 /* protects updating the header */
207 struct rw_semaphore header_rwsem;
209 struct rbd_mapping mapping;
211 struct list_head node;
213 /* list of snapshots */
214 struct list_head snaps;
220 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
222 static LIST_HEAD(rbd_dev_list); /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
225 static LIST_HEAD(rbd_client_list); /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
231 static void rbd_dev_release(struct device *dev);
232 static void rbd_remove_snap_dev(struct rbd_snap *snap);
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
239 static struct bus_attribute rbd_bus_attrs[] = {
240 __ATTR(add, S_IWUSR, NULL, rbd_add),
241 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
245 static struct bus_type rbd_bus_type = {
247 .bus_attrs = rbd_bus_attrs,
250 static void rbd_root_dev_release(struct device *dev)
254 static struct device rbd_root_dev = {
256 .release = rbd_root_dev_release,
260 #define rbd_assert(expr) \
261 if (unlikely(!(expr))) { \
262 printk(KERN_ERR "\nAssertion failure in %s() " \
264 "\trbd_assert(%s);\n\n", \
265 __func__, __LINE__, #expr); \
268 #else /* !RBD_DEBUG */
269 # define rbd_assert(expr) ((void) 0)
270 #endif /* !RBD_DEBUG */
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
274 return get_device(&rbd_dev->dev);
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
279 put_device(&rbd_dev->dev);
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
287 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
289 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
292 rbd_get_dev(rbd_dev);
293 set_device_ro(bdev, rbd_dev->mapping.read_only);
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
300 struct rbd_device *rbd_dev = disk->private_data;
302 rbd_put_dev(rbd_dev);
307 static const struct block_device_operations rbd_bd_ops = {
308 .owner = THIS_MODULE,
310 .release = rbd_release,
314 * Initialize an rbd client instance.
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
319 struct rbd_client *rbdc;
322 dout("rbd_client_create\n");
323 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
327 kref_init(&rbdc->kref);
328 INIT_LIST_HEAD(&rbdc->node);
330 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
332 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333 if (IS_ERR(rbdc->client))
335 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
337 ret = ceph_open_session(rbdc->client);
341 spin_lock(&rbd_client_list_lock);
342 list_add_tail(&rbdc->node, &rbd_client_list);
343 spin_unlock(&rbd_client_list_lock);
345 mutex_unlock(&ctl_mutex);
347 dout("rbd_client_create created %p\n", rbdc);
351 ceph_destroy_client(rbdc->client);
353 mutex_unlock(&ctl_mutex);
357 ceph_destroy_options(ceph_opts);
362 * Find a ceph client with specific addr and configuration. If
363 * found, bump its reference count.
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
367 struct rbd_client *client_node;
370 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
373 spin_lock(&rbd_client_list_lock);
374 list_for_each_entry(client_node, &rbd_client_list, node) {
375 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376 kref_get(&client_node->kref);
381 spin_unlock(&rbd_client_list_lock);
383 return found ? client_node : NULL;
393 /* string args above */
396 /* Boolean args above */
400 static match_table_t rbd_opts_tokens = {
402 /* string args above */
403 {Opt_read_only, "read_only"},
404 {Opt_read_only, "ro"}, /* Alternate spelling */
405 {Opt_read_write, "read_write"},
406 {Opt_read_write, "rw"}, /* Alternate spelling */
407 /* Boolean args above */
411 static int parse_rbd_opts_token(char *c, void *private)
413 struct rbd_options *rbd_opts = private;
414 substring_t argstr[MAX_OPT_ARGS];
415 int token, intval, ret;
417 token = match_token(c, rbd_opts_tokens, argstr);
421 if (token < Opt_last_int) {
422 ret = match_int(&argstr[0], &intval);
424 pr_err("bad mount option arg (not int) "
428 dout("got int token %d val %d\n", token, intval);
429 } else if (token > Opt_last_int && token < Opt_last_string) {
430 dout("got string token %d val %s\n", token,
432 } else if (token > Opt_last_string && token < Opt_last_bool) {
433 dout("got Boolean token %d\n", token);
435 dout("got token %d\n", token);
440 rbd_opts->read_only = true;
443 rbd_opts->read_only = false;
453 * Get a ceph client with specific addr and configuration, if one does
454 * not exist create it.
456 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
457 size_t mon_addr_len, char *options)
459 struct rbd_options rbd_opts;
460 struct ceph_options *ceph_opts;
461 struct rbd_client *rbdc;
463 /* Initialize all rbd options to the defaults */
465 rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
467 ceph_opts = ceph_parse_options(options, mon_addr,
468 mon_addr + mon_addr_len,
469 parse_rbd_opts_token, &rbd_opts);
470 if (IS_ERR(ceph_opts))
471 return PTR_ERR(ceph_opts);
473 /* Record the parsed rbd options */
475 rbd_dev->mapping.read_only = rbd_opts.read_only;
477 rbdc = rbd_client_find(ceph_opts);
479 /* using an existing client */
480 ceph_destroy_options(ceph_opts);
482 rbdc = rbd_client_create(ceph_opts);
484 return PTR_ERR(rbdc);
486 rbd_dev->rbd_client = rbdc;
492 * Destroy ceph client
494 * Caller must hold rbd_client_list_lock.
496 static void rbd_client_release(struct kref *kref)
498 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
500 dout("rbd_release_client %p\n", rbdc);
501 spin_lock(&rbd_client_list_lock);
502 list_del(&rbdc->node);
503 spin_unlock(&rbd_client_list_lock);
505 ceph_destroy_client(rbdc->client);
510 * Drop reference to ceph client node. If it's not referenced anymore, release
513 static void rbd_put_client(struct rbd_device *rbd_dev)
515 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
516 rbd_dev->rbd_client = NULL;
520 * Destroy requests collection
522 static void rbd_coll_release(struct kref *kref)
524 struct rbd_req_coll *coll =
525 container_of(kref, struct rbd_req_coll, kref);
527 dout("rbd_coll_release %p\n", coll);
531 static bool rbd_image_format_valid(u32 image_format)
533 return image_format == 1 || image_format == 2;
536 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
541 /* The header has to start with the magic rbd header text */
542 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
545 /* The bio layer requires at least sector-sized I/O */
547 if (ondisk->options.order < SECTOR_SHIFT)
550 /* If we use u64 in a few spots we may be able to loosen this */
552 if (ondisk->options.order > 8 * sizeof (int) - 1)
556 * The size of a snapshot header has to fit in a size_t, and
557 * that limits the number of snapshots.
559 snap_count = le32_to_cpu(ondisk->snap_count);
560 size = SIZE_MAX - sizeof (struct ceph_snap_context);
561 if (snap_count > size / sizeof (__le64))
565 * Not only that, but the size of the entire the snapshot
566 * header must also be representable in a size_t.
568 size -= snap_count * sizeof (__le64);
569 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
576 * Create a new header structure, translate header format from the on-disk
579 static int rbd_header_from_disk(struct rbd_image_header *header,
580 struct rbd_image_header_ondisk *ondisk)
587 memset(header, 0, sizeof (*header));
589 snap_count = le32_to_cpu(ondisk->snap_count);
591 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
592 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
593 if (!header->object_prefix)
595 memcpy(header->object_prefix, ondisk->object_prefix, len);
596 header->object_prefix[len] = '\0';
599 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
601 /* Save a copy of the snapshot names */
603 if (snap_names_len > (u64) SIZE_MAX)
605 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
606 if (!header->snap_names)
609 * Note that rbd_dev_v1_header_read() guarantees
610 * the ondisk buffer we're working with has
611 * snap_names_len bytes beyond the end of the
612 * snapshot id array, this memcpy() is safe.
614 memcpy(header->snap_names, &ondisk->snaps[snap_count],
617 /* Record each snapshot's size */
619 size = snap_count * sizeof (*header->snap_sizes);
620 header->snap_sizes = kmalloc(size, GFP_KERNEL);
621 if (!header->snap_sizes)
623 for (i = 0; i < snap_count; i++)
624 header->snap_sizes[i] =
625 le64_to_cpu(ondisk->snaps[i].image_size);
627 WARN_ON(ondisk->snap_names_len);
628 header->snap_names = NULL;
629 header->snap_sizes = NULL;
632 header->features = 0; /* No features support in v1 images */
633 header->obj_order = ondisk->options.order;
634 header->crypt_type = ondisk->options.crypt_type;
635 header->comp_type = ondisk->options.comp_type;
637 /* Allocate and fill in the snapshot context */
639 header->image_size = le64_to_cpu(ondisk->image_size);
640 size = sizeof (struct ceph_snap_context);
641 size += snap_count * sizeof (header->snapc->snaps[0]);
642 header->snapc = kzalloc(size, GFP_KERNEL);
646 atomic_set(&header->snapc->nref, 1);
647 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
648 header->snapc->num_snaps = snap_count;
649 for (i = 0; i < snap_count; i++)
650 header->snapc->snaps[i] =
651 le64_to_cpu(ondisk->snaps[i].id);
656 kfree(header->snap_sizes);
657 header->snap_sizes = NULL;
658 kfree(header->snap_names);
659 header->snap_names = NULL;
660 kfree(header->object_prefix);
661 header->object_prefix = NULL;
666 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
669 struct rbd_snap *snap;
671 list_for_each_entry(snap, &rbd_dev->snaps, node) {
672 if (!strcmp(snap_name, snap->name)) {
673 rbd_dev->snap_id = snap->id;
674 rbd_dev->mapping.size = snap->size;
675 rbd_dev->mapping.features = snap->features;
684 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
688 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
689 sizeof (RBD_SNAP_HEAD_NAME))) {
690 rbd_dev->snap_id = CEPH_NOSNAP;
691 rbd_dev->mapping.size = rbd_dev->header.image_size;
692 rbd_dev->mapping.features = rbd_dev->header.features;
695 ret = snap_by_name(rbd_dev, snap_name);
698 rbd_dev->mapping.read_only = true;
700 rbd_dev->snap_name = snap_name;
701 rbd_dev->exists = true;
706 static void rbd_header_free(struct rbd_image_header *header)
708 kfree(header->object_prefix);
709 header->object_prefix = NULL;
710 kfree(header->snap_sizes);
711 header->snap_sizes = NULL;
712 kfree(header->snap_names);
713 header->snap_names = NULL;
714 ceph_put_snap_context(header->snapc);
715 header->snapc = NULL;
718 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
724 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
727 segment = offset >> rbd_dev->header.obj_order;
728 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
729 rbd_dev->header.object_prefix, segment);
730 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
731 pr_err("error formatting segment name for #%llu (%d)\n",
740 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
742 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
744 return offset & (segment_size - 1);
747 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
748 u64 offset, u64 length)
750 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
752 offset &= segment_size - 1;
754 rbd_assert(length <= U64_MAX - offset);
755 if (offset + length > segment_size)
756 length = segment_size - offset;
761 static int rbd_get_num_segments(struct rbd_image_header *header,
769 if (len - 1 > U64_MAX - ofs)
772 start_seg = ofs >> header->obj_order;
773 end_seg = (ofs + len - 1) >> header->obj_order;
775 return end_seg - start_seg + 1;
779 * returns the size of an object in the image
781 static u64 rbd_obj_bytes(struct rbd_image_header *header)
783 return 1 << header->obj_order;
790 static void bio_chain_put(struct bio *chain)
796 chain = chain->bi_next;
802 * zeros a bio chain, starting at specific offset
804 static void zero_bio_chain(struct bio *chain, int start_ofs)
813 bio_for_each_segment(bv, chain, i) {
814 if (pos + bv->bv_len > start_ofs) {
815 int remainder = max(start_ofs - pos, 0);
816 buf = bvec_kmap_irq(bv, &flags);
817 memset(buf + remainder, 0,
818 bv->bv_len - remainder);
819 bvec_kunmap_irq(buf, &flags);
824 chain = chain->bi_next;
829 * Clone a portion of a bio, starting at the given byte offset
830 * and continuing for the number of bytes indicated.
832 static struct bio *bio_clone_range(struct bio *bio_src,
841 unsigned short end_idx;
845 /* Handle the easy case for the caller */
847 if (!offset && len == bio_src->bi_size)
848 return bio_clone(bio_src, gfpmask);
850 if (WARN_ON_ONCE(!len))
852 if (WARN_ON_ONCE(len > bio_src->bi_size))
854 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
857 /* Find first affected segment... */
860 __bio_for_each_segment(bv, bio_src, idx, 0) {
861 if (resid < bv->bv_len)
867 /* ...and the last affected segment */
870 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
871 if (resid <= bv->bv_len)
875 vcnt = end_idx - idx + 1;
877 /* Build the clone */
879 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
881 return NULL; /* ENOMEM */
883 bio->bi_bdev = bio_src->bi_bdev;
884 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
885 bio->bi_rw = bio_src->bi_rw;
886 bio->bi_flags |= 1 << BIO_CLONED;
889 * Copy over our part of the bio_vec, then update the first
890 * and last (or only) entries.
892 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
893 vcnt * sizeof (struct bio_vec));
894 bio->bi_io_vec[0].bv_offset += voff;
896 bio->bi_io_vec[0].bv_len -= voff;
897 bio->bi_io_vec[vcnt - 1].bv_len = resid;
899 bio->bi_io_vec[0].bv_len = len;
910 * Clone a portion of a bio chain, starting at the given byte offset
911 * into the first bio in the source chain and continuing for the
912 * number of bytes indicated. The result is another bio chain of
913 * exactly the given length, or a null pointer on error.
915 * The bio_src and offset parameters are both in-out. On entry they
916 * refer to the first source bio and the offset into that bio where
917 * the start of data to be cloned is located.
919 * On return, bio_src is updated to refer to the bio in the source
920 * chain that contains first un-cloned byte, and *offset will
921 * contain the offset of that byte within that bio.
923 static struct bio *bio_chain_clone_range(struct bio **bio_src,
924 unsigned int *offset,
928 struct bio *bi = *bio_src;
929 unsigned int off = *offset;
930 struct bio *chain = NULL;
933 /* Build up a chain of clone bios up to the limit */
935 if (!bi || off >= bi->bi_size || !len)
936 return NULL; /* Nothing to clone */
940 unsigned int bi_size;
944 goto out_err; /* EINVAL; ran out of bio's */
945 bi_size = min_t(unsigned int, bi->bi_size - off, len);
946 bio = bio_clone_range(bi, off, bi_size, gfpmask);
948 goto out_err; /* ENOMEM */
954 if (off == bi->bi_size) {
965 bio_chain_put(chain);
971 * helpers for osd request op vectors.
973 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
974 int opcode, u32 payload_len)
976 struct ceph_osd_req_op *ops;
978 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
985 * op extent offset and length will be set later on
986 * in calc_raw_layout()
988 ops[0].payload_len = payload_len;
993 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
998 static void rbd_coll_end_req_index(struct request *rq,
999 struct rbd_req_coll *coll,
1003 struct request_queue *q;
1006 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1007 coll, index, ret, (unsigned long long) len);
1013 blk_end_request(rq, ret, len);
1019 spin_lock_irq(q->queue_lock);
1020 coll->status[index].done = 1;
1021 coll->status[index].rc = ret;
1022 coll->status[index].bytes = len;
1023 max = min = coll->num_done;
1024 while (max < coll->total && coll->status[max].done)
1027 for (i = min; i<max; i++) {
1028 __blk_end_request(rq, coll->status[i].rc,
1029 coll->status[i].bytes);
1031 kref_put(&coll->kref, rbd_coll_release);
1033 spin_unlock_irq(q->queue_lock);
1036 static void rbd_coll_end_req(struct rbd_request *req,
1039 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1043 * Send ceph osd request
1045 static int rbd_do_request(struct request *rq,
1046 struct rbd_device *rbd_dev,
1047 struct ceph_snap_context *snapc,
1049 const char *object_name, u64 ofs, u64 len,
1051 struct page **pages,
1054 struct ceph_osd_req_op *ops,
1055 struct rbd_req_coll *coll,
1057 void (*rbd_cb)(struct ceph_osd_request *req,
1058 struct ceph_msg *msg),
1059 struct ceph_osd_request **linger_req,
1062 struct ceph_osd_request *req;
1063 struct ceph_file_layout *layout;
1066 struct timespec mtime = CURRENT_TIME;
1067 struct rbd_request *req_data;
1068 struct ceph_osd_request_head *reqhead;
1069 struct ceph_osd_client *osdc;
1071 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1074 rbd_coll_end_req_index(rq, coll, coll_index,
1080 req_data->coll = coll;
1081 req_data->coll_index = coll_index;
1084 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1085 object_name, (unsigned long long) ofs,
1086 (unsigned long long) len, coll, coll_index);
1088 osdc = &rbd_dev->rbd_client->client->osdc;
1089 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1090 false, GFP_NOIO, pages, bio);
1096 req->r_callback = rbd_cb;
1099 req_data->bio = bio;
1100 req_data->pages = pages;
1101 req_data->len = len;
1103 req->r_priv = req_data;
1105 reqhead = req->r_request->front.iov_base;
1106 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1108 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1109 req->r_oid_len = strlen(req->r_oid);
1111 layout = &req->r_file_layout;
1112 memset(layout, 0, sizeof(*layout));
1113 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1114 layout->fl_stripe_count = cpu_to_le32(1);
1115 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1116 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->pool_id);
1117 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1119 rbd_assert(ret == 0);
1121 ceph_osdc_build_request(req, ofs, &len,
1125 req->r_oid, req->r_oid_len);
1128 ceph_osdc_set_request_linger(osdc, req);
1132 ret = ceph_osdc_start_request(osdc, req, false);
1137 ret = ceph_osdc_wait_request(osdc, req);
1139 *ver = le64_to_cpu(req->r_reassert_version.version);
1140 dout("reassert_ver=%llu\n",
1141 (unsigned long long)
1142 le64_to_cpu(req->r_reassert_version.version));
1143 ceph_osdc_put_request(req);
1148 bio_chain_put(req_data->bio);
1149 ceph_osdc_put_request(req);
1151 rbd_coll_end_req(req_data, ret, len);
1157 * Ceph osd op callback
1159 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1161 struct rbd_request *req_data = req->r_priv;
1162 struct ceph_osd_reply_head *replyhead;
1163 struct ceph_osd_op *op;
1169 replyhead = msg->front.iov_base;
1170 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1171 op = (void *)(replyhead + 1);
1172 rc = le32_to_cpu(replyhead->result);
1173 bytes = le64_to_cpu(op->extent.length);
1174 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1176 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1177 (unsigned long long) bytes, read_op, (int) rc);
1179 if (rc == -ENOENT && read_op) {
1180 zero_bio_chain(req_data->bio, 0);
1182 } else if (rc == 0 && read_op && bytes < req_data->len) {
1183 zero_bio_chain(req_data->bio, bytes);
1184 bytes = req_data->len;
1187 rbd_coll_end_req(req_data, rc, bytes);
1190 bio_chain_put(req_data->bio);
1192 ceph_osdc_put_request(req);
1196 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1198 ceph_osdc_put_request(req);
1202 * Do a synchronous ceph osd operation
1204 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1205 struct ceph_snap_context *snapc,
1208 struct ceph_osd_req_op *ops,
1209 const char *object_name,
1210 u64 ofs, u64 inbound_size,
1212 struct ceph_osd_request **linger_req,
1216 struct page **pages;
1219 rbd_assert(ops != NULL);
1221 num_pages = calc_pages_for(ofs, inbound_size);
1222 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1224 return PTR_ERR(pages);
1226 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1227 object_name, ofs, inbound_size, NULL,
1237 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1238 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1241 ceph_release_page_vector(pages, num_pages);
1246 * Do an asynchronous ceph osd operation
1248 static int rbd_do_op(struct request *rq,
1249 struct rbd_device *rbd_dev,
1250 struct ceph_snap_context *snapc,
1253 struct rbd_req_coll *coll,
1260 struct ceph_osd_req_op *ops;
1266 seg_name = rbd_segment_name(rbd_dev, ofs);
1269 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1270 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1272 if (rq_data_dir(rq) == WRITE) {
1273 opcode = CEPH_OSD_OP_WRITE;
1274 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1275 snapid = CEPH_NOSNAP;
1276 payload_len = seg_len;
1278 opcode = CEPH_OSD_OP_READ;
1279 flags = CEPH_OSD_FLAG_READ;
1281 snapid = rbd_dev->snap_id;
1286 ops = rbd_create_rw_ops(1, opcode, payload_len);
1290 /* we've taken care of segment sizes earlier when we
1291 cloned the bios. We should never have a segment
1292 truncated at this point */
1293 rbd_assert(seg_len == len);
1295 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1296 seg_name, seg_ofs, seg_len,
1302 rbd_req_cb, 0, NULL);
1304 rbd_destroy_ops(ops);
1311 * Request sync osd read
1313 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1315 const char *object_name,
1320 struct ceph_osd_req_op *ops;
1323 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1327 ret = rbd_req_sync_op(rbd_dev, NULL,
1330 ops, object_name, ofs, len, buf, NULL, ver);
1331 rbd_destroy_ops(ops);
1337 * Request sync osd watch
1339 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1343 struct ceph_osd_req_op *ops;
1346 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1350 ops[0].watch.ver = cpu_to_le64(ver);
1351 ops[0].watch.cookie = notify_id;
1352 ops[0].watch.flag = 0;
1354 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1355 rbd_dev->header_name, 0, 0, NULL,
1360 rbd_simple_req_cb, 0, NULL);
1362 rbd_destroy_ops(ops);
1366 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1368 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1375 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1376 rbd_dev->header_name, (unsigned long long) notify_id,
1377 (unsigned int) opcode);
1378 rc = rbd_dev_refresh(rbd_dev, &hver);
1380 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1381 " update snaps: %d\n", rbd_dev->major, rc);
1383 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1387 * Request sync osd watch
1389 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1391 struct ceph_osd_req_op *ops;
1392 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1395 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1399 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1400 (void *)rbd_dev, &rbd_dev->watch_event);
1404 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1405 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1406 ops[0].watch.flag = 1;
1408 ret = rbd_req_sync_op(rbd_dev, NULL,
1410 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1412 rbd_dev->header_name,
1414 &rbd_dev->watch_request, NULL);
1419 rbd_destroy_ops(ops);
1423 ceph_osdc_cancel_event(rbd_dev->watch_event);
1424 rbd_dev->watch_event = NULL;
1426 rbd_destroy_ops(ops);
1431 * Request sync osd unwatch
1433 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1435 struct ceph_osd_req_op *ops;
1438 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1442 ops[0].watch.ver = 0;
1443 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1444 ops[0].watch.flag = 0;
1446 ret = rbd_req_sync_op(rbd_dev, NULL,
1448 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1450 rbd_dev->header_name,
1451 0, 0, NULL, NULL, NULL);
1454 rbd_destroy_ops(ops);
1455 ceph_osdc_cancel_event(rbd_dev->watch_event);
1456 rbd_dev->watch_event = NULL;
1461 * Synchronous osd object method call
1463 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1464 const char *object_name,
1465 const char *class_name,
1466 const char *method_name,
1467 const char *outbound,
1468 size_t outbound_size,
1470 size_t inbound_size,
1474 struct ceph_osd_req_op *ops;
1475 int class_name_len = strlen(class_name);
1476 int method_name_len = strlen(method_name);
1481 * Any input parameters required by the method we're calling
1482 * will be sent along with the class and method names as
1483 * part of the message payload. That data and its size are
1484 * supplied via the indata and indata_len fields (named from
1485 * the perspective of the server side) in the OSD request
1488 payload_size = class_name_len + method_name_len + outbound_size;
1489 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1493 ops[0].cls.class_name = class_name;
1494 ops[0].cls.class_len = (__u8) class_name_len;
1495 ops[0].cls.method_name = method_name;
1496 ops[0].cls.method_len = (__u8) method_name_len;
1497 ops[0].cls.argc = 0;
1498 ops[0].cls.indata = outbound;
1499 ops[0].cls.indata_len = outbound_size;
1501 ret = rbd_req_sync_op(rbd_dev, NULL,
1504 object_name, 0, inbound_size, inbound,
1507 rbd_destroy_ops(ops);
1509 dout("cls_exec returned %d\n", ret);
1513 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1515 struct rbd_req_coll *coll =
1516 kzalloc(sizeof(struct rbd_req_coll) +
1517 sizeof(struct rbd_req_status) * num_reqs,
1522 coll->total = num_reqs;
1523 kref_init(&coll->kref);
1528 * block device queue callback
1530 static void rbd_rq_fn(struct request_queue *q)
1532 struct rbd_device *rbd_dev = q->queuedata;
1535 while ((rq = blk_fetch_request(q))) {
1540 int num_segs, cur_seg = 0;
1541 struct rbd_req_coll *coll;
1542 struct ceph_snap_context *snapc;
1543 unsigned int bio_offset;
1545 dout("fetched request\n");
1547 /* filter out block requests we don't understand */
1548 if ((rq->cmd_type != REQ_TYPE_FS)) {
1549 __blk_end_request_all(rq, 0);
1553 /* deduce our operation (read, write) */
1554 do_write = (rq_data_dir(rq) == WRITE);
1555 if (do_write && rbd_dev->mapping.read_only) {
1556 __blk_end_request_all(rq, -EROFS);
1560 spin_unlock_irq(q->queue_lock);
1562 down_read(&rbd_dev->header_rwsem);
1564 if (!rbd_dev->exists) {
1565 rbd_assert(rbd_dev->snap_id != CEPH_NOSNAP);
1566 up_read(&rbd_dev->header_rwsem);
1567 dout("request for non-existent snapshot");
1568 spin_lock_irq(q->queue_lock);
1569 __blk_end_request_all(rq, -ENXIO);
1573 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1575 up_read(&rbd_dev->header_rwsem);
1577 size = blk_rq_bytes(rq);
1578 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1581 dout("%s 0x%x bytes at 0x%llx\n",
1582 do_write ? "write" : "read",
1583 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1585 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1586 if (num_segs <= 0) {
1587 spin_lock_irq(q->queue_lock);
1588 __blk_end_request_all(rq, num_segs);
1589 ceph_put_snap_context(snapc);
1592 coll = rbd_alloc_coll(num_segs);
1594 spin_lock_irq(q->queue_lock);
1595 __blk_end_request_all(rq, -ENOMEM);
1596 ceph_put_snap_context(snapc);
1602 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1603 unsigned int chain_size;
1604 struct bio *bio_chain;
1606 BUG_ON(limit > (u64) UINT_MAX);
1607 chain_size = (unsigned int) limit;
1608 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1610 kref_get(&coll->kref);
1612 /* Pass a cloned bio chain via an osd request */
1614 bio_chain = bio_chain_clone_range(&bio,
1615 &bio_offset, chain_size,
1618 (void) rbd_do_op(rq, rbd_dev, snapc,
1620 bio_chain, coll, cur_seg);
1622 rbd_coll_end_req_index(rq, coll, cur_seg,
1623 -ENOMEM, chain_size);
1629 kref_put(&coll->kref, rbd_coll_release);
1631 spin_lock_irq(q->queue_lock);
1633 ceph_put_snap_context(snapc);
1638 * a queue callback. Makes sure that we don't create a bio that spans across
1639 * multiple osd objects. One exception would be with a single page bios,
1640 * which we handle later at bio_chain_clone_range()
1642 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1643 struct bio_vec *bvec)
1645 struct rbd_device *rbd_dev = q->queuedata;
1646 sector_t sector_offset;
1647 sector_t sectors_per_obj;
1648 sector_t obj_sector_offset;
1652 * Find how far into its rbd object the partition-relative
1653 * bio start sector is to offset relative to the enclosing
1656 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1657 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1658 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1661 * Compute the number of bytes from that offset to the end
1662 * of the object. Account for what's already used by the bio.
1664 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1665 if (ret > bmd->bi_size)
1666 ret -= bmd->bi_size;
1671 * Don't send back more than was asked for. And if the bio
1672 * was empty, let the whole thing through because: "Note
1673 * that a block device *must* allow a single page to be
1674 * added to an empty bio."
1676 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1677 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1678 ret = (int) bvec->bv_len;
1683 static void rbd_free_disk(struct rbd_device *rbd_dev)
1685 struct gendisk *disk = rbd_dev->disk;
1690 if (disk->flags & GENHD_FL_UP)
1693 blk_cleanup_queue(disk->queue);
1698 * Read the complete header for the given rbd device.
1700 * Returns a pointer to a dynamically-allocated buffer containing
1701 * the complete and validated header. Caller can pass the address
1702 * of a variable that will be filled in with the version of the
1703 * header object at the time it was read.
1705 * Returns a pointer-coded errno if a failure occurs.
1707 static struct rbd_image_header_ondisk *
1708 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1710 struct rbd_image_header_ondisk *ondisk = NULL;
1717 * The complete header will include an array of its 64-bit
1718 * snapshot ids, followed by the names of those snapshots as
1719 * a contiguous block of NUL-terminated strings. Note that
1720 * the number of snapshots could change by the time we read
1721 * it in, in which case we re-read it.
1728 size = sizeof (*ondisk);
1729 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1731 ondisk = kmalloc(size, GFP_KERNEL);
1733 return ERR_PTR(-ENOMEM);
1735 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1736 rbd_dev->header_name,
1738 (char *) ondisk, version);
1742 if (WARN_ON((size_t) ret < size)) {
1744 pr_warning("short header read for image %s"
1745 " (want %zd got %d)\n",
1746 rbd_dev->image_name, size, ret);
1749 if (!rbd_dev_ondisk_valid(ondisk)) {
1751 pr_warning("invalid header for image %s\n",
1752 rbd_dev->image_name);
1756 names_size = le64_to_cpu(ondisk->snap_names_len);
1757 want_count = snap_count;
1758 snap_count = le32_to_cpu(ondisk->snap_count);
1759 } while (snap_count != want_count);
1766 return ERR_PTR(ret);
1770 * reload the ondisk the header
1772 static int rbd_read_header(struct rbd_device *rbd_dev,
1773 struct rbd_image_header *header)
1775 struct rbd_image_header_ondisk *ondisk;
1779 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1781 return PTR_ERR(ondisk);
1782 ret = rbd_header_from_disk(header, ondisk);
1784 header->obj_version = ver;
1790 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1792 struct rbd_snap *snap;
1793 struct rbd_snap *next;
1795 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1796 rbd_remove_snap_dev(snap);
1799 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1803 if (rbd_dev->snap_id != CEPH_NOSNAP)
1806 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1807 dout("setting size to %llu sectors", (unsigned long long) size);
1808 rbd_dev->mapping.size = (u64) size;
1809 set_capacity(rbd_dev->disk, size);
1813 * only read the first part of the ondisk header, without the snaps info
1815 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1818 struct rbd_image_header h;
1820 ret = rbd_read_header(rbd_dev, &h);
1824 down_write(&rbd_dev->header_rwsem);
1826 /* Update image size, and check for resize of mapped image */
1827 rbd_dev->header.image_size = h.image_size;
1828 rbd_update_mapping_size(rbd_dev);
1830 /* rbd_dev->header.object_prefix shouldn't change */
1831 kfree(rbd_dev->header.snap_sizes);
1832 kfree(rbd_dev->header.snap_names);
1833 /* osd requests may still refer to snapc */
1834 ceph_put_snap_context(rbd_dev->header.snapc);
1837 *hver = h.obj_version;
1838 rbd_dev->header.obj_version = h.obj_version;
1839 rbd_dev->header.image_size = h.image_size;
1840 rbd_dev->header.snapc = h.snapc;
1841 rbd_dev->header.snap_names = h.snap_names;
1842 rbd_dev->header.snap_sizes = h.snap_sizes;
1843 /* Free the extra copy of the object prefix */
1844 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1845 kfree(h.object_prefix);
1847 ret = rbd_dev_snaps_update(rbd_dev);
1849 ret = rbd_dev_snaps_register(rbd_dev);
1851 up_write(&rbd_dev->header_rwsem);
1856 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1860 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1861 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1862 if (rbd_dev->image_format == 1)
1863 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1865 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1866 mutex_unlock(&ctl_mutex);
1871 static int rbd_init_disk(struct rbd_device *rbd_dev)
1873 struct gendisk *disk;
1874 struct request_queue *q;
1877 /* create gendisk info */
1878 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1882 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1884 disk->major = rbd_dev->major;
1885 disk->first_minor = 0;
1886 disk->fops = &rbd_bd_ops;
1887 disk->private_data = rbd_dev;
1890 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1894 /* We use the default size, but let's be explicit about it. */
1895 blk_queue_physical_block_size(q, SECTOR_SIZE);
1897 /* set io sizes to object size */
1898 segment_size = rbd_obj_bytes(&rbd_dev->header);
1899 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1900 blk_queue_max_segment_size(q, segment_size);
1901 blk_queue_io_min(q, segment_size);
1902 blk_queue_io_opt(q, segment_size);
1904 blk_queue_merge_bvec(q, rbd_merge_bvec);
1907 q->queuedata = rbd_dev;
1909 rbd_dev->disk = disk;
1911 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1924 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1926 return container_of(dev, struct rbd_device, dev);
1929 static ssize_t rbd_size_show(struct device *dev,
1930 struct device_attribute *attr, char *buf)
1932 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1935 down_read(&rbd_dev->header_rwsem);
1936 size = get_capacity(rbd_dev->disk);
1937 up_read(&rbd_dev->header_rwsem);
1939 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1943 * Note this shows the features for whatever's mapped, which is not
1944 * necessarily the base image.
1946 static ssize_t rbd_features_show(struct device *dev,
1947 struct device_attribute *attr, char *buf)
1949 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951 return sprintf(buf, "0x%016llx\n",
1952 (unsigned long long) rbd_dev->mapping.features);
1955 static ssize_t rbd_major_show(struct device *dev,
1956 struct device_attribute *attr, char *buf)
1958 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960 return sprintf(buf, "%d\n", rbd_dev->major);
1963 static ssize_t rbd_client_id_show(struct device *dev,
1964 struct device_attribute *attr, char *buf)
1966 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968 return sprintf(buf, "client%lld\n",
1969 ceph_client_id(rbd_dev->rbd_client->client));
1972 static ssize_t rbd_pool_show(struct device *dev,
1973 struct device_attribute *attr, char *buf)
1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1977 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1980 static ssize_t rbd_pool_id_show(struct device *dev,
1981 struct device_attribute *attr, char *buf)
1983 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1985 return sprintf(buf, "%llu\n", (unsigned long long) rbd_dev->pool_id);
1988 static ssize_t rbd_name_show(struct device *dev,
1989 struct device_attribute *attr, char *buf)
1991 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1993 return sprintf(buf, "%s\n", rbd_dev->image_name);
1996 static ssize_t rbd_image_id_show(struct device *dev,
1997 struct device_attribute *attr, char *buf)
1999 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2001 return sprintf(buf, "%s\n", rbd_dev->image_id);
2005 * Shows the name of the currently-mapped snapshot (or
2006 * RBD_SNAP_HEAD_NAME for the base image).
2008 static ssize_t rbd_snap_show(struct device *dev,
2009 struct device_attribute *attr,
2012 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2014 return sprintf(buf, "%s\n", rbd_dev->snap_name);
2017 static ssize_t rbd_image_refresh(struct device *dev,
2018 struct device_attribute *attr,
2022 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2025 ret = rbd_dev_refresh(rbd_dev, NULL);
2027 return ret < 0 ? ret : size;
2030 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2031 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2032 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2033 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2034 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2035 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2036 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2037 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2038 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2039 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2041 static struct attribute *rbd_attrs[] = {
2042 &dev_attr_size.attr,
2043 &dev_attr_features.attr,
2044 &dev_attr_major.attr,
2045 &dev_attr_client_id.attr,
2046 &dev_attr_pool.attr,
2047 &dev_attr_pool_id.attr,
2048 &dev_attr_name.attr,
2049 &dev_attr_image_id.attr,
2050 &dev_attr_current_snap.attr,
2051 &dev_attr_refresh.attr,
2055 static struct attribute_group rbd_attr_group = {
2059 static const struct attribute_group *rbd_attr_groups[] = {
2064 static void rbd_sysfs_dev_release(struct device *dev)
2068 static struct device_type rbd_device_type = {
2070 .groups = rbd_attr_groups,
2071 .release = rbd_sysfs_dev_release,
2079 static ssize_t rbd_snap_size_show(struct device *dev,
2080 struct device_attribute *attr,
2083 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2085 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2088 static ssize_t rbd_snap_id_show(struct device *dev,
2089 struct device_attribute *attr,
2092 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2094 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2097 static ssize_t rbd_snap_features_show(struct device *dev,
2098 struct device_attribute *attr,
2101 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2103 return sprintf(buf, "0x%016llx\n",
2104 (unsigned long long) snap->features);
2107 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2108 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2109 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2111 static struct attribute *rbd_snap_attrs[] = {
2112 &dev_attr_snap_size.attr,
2113 &dev_attr_snap_id.attr,
2114 &dev_attr_snap_features.attr,
2118 static struct attribute_group rbd_snap_attr_group = {
2119 .attrs = rbd_snap_attrs,
2122 static void rbd_snap_dev_release(struct device *dev)
2124 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2129 static const struct attribute_group *rbd_snap_attr_groups[] = {
2130 &rbd_snap_attr_group,
2134 static struct device_type rbd_snap_device_type = {
2135 .groups = rbd_snap_attr_groups,
2136 .release = rbd_snap_dev_release,
2139 static bool rbd_snap_registered(struct rbd_snap *snap)
2141 bool ret = snap->dev.type == &rbd_snap_device_type;
2142 bool reg = device_is_registered(&snap->dev);
2144 rbd_assert(!ret ^ reg);
2149 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2151 list_del(&snap->node);
2152 if (device_is_registered(&snap->dev))
2153 device_unregister(&snap->dev);
2156 static int rbd_register_snap_dev(struct rbd_snap *snap,
2157 struct device *parent)
2159 struct device *dev = &snap->dev;
2162 dev->type = &rbd_snap_device_type;
2163 dev->parent = parent;
2164 dev->release = rbd_snap_dev_release;
2165 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2166 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2168 ret = device_register(dev);
2173 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2174 const char *snap_name,
2175 u64 snap_id, u64 snap_size,
2178 struct rbd_snap *snap;
2181 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2183 return ERR_PTR(-ENOMEM);
2186 snap->name = kstrdup(snap_name, GFP_KERNEL);
2191 snap->size = snap_size;
2192 snap->features = snap_features;
2200 return ERR_PTR(ret);
2203 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2204 u64 *snap_size, u64 *snap_features)
2208 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2210 *snap_size = rbd_dev->header.snap_sizes[which];
2211 *snap_features = 0; /* No features for v1 */
2213 /* Skip over names until we find the one we are looking for */
2215 snap_name = rbd_dev->header.snap_names;
2217 snap_name += strlen(snap_name) + 1;
2223 * Get the size and object order for an image snapshot, or if
2224 * snap_id is CEPH_NOSNAP, gets this information for the base
2227 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2228 u8 *order, u64 *snap_size)
2230 __le64 snapid = cpu_to_le64(snap_id);
2235 } __attribute__ ((packed)) size_buf = { 0 };
2237 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2239 (char *) &snapid, sizeof (snapid),
2240 (char *) &size_buf, sizeof (size_buf),
2241 CEPH_OSD_FLAG_READ, NULL);
2242 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2246 *order = size_buf.order;
2247 *snap_size = le64_to_cpu(size_buf.size);
2249 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2250 (unsigned long long) snap_id, (unsigned int) *order,
2251 (unsigned long long) *snap_size);
2256 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2258 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2259 &rbd_dev->header.obj_order,
2260 &rbd_dev->header.image_size);
2263 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2269 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2273 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2274 "rbd", "get_object_prefix",
2276 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2277 CEPH_OSD_FLAG_READ, NULL);
2278 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2281 ret = 0; /* rbd_req_sync_exec() can return positive */
2284 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2285 p + RBD_OBJ_PREFIX_LEN_MAX,
2288 if (IS_ERR(rbd_dev->header.object_prefix)) {
2289 ret = PTR_ERR(rbd_dev->header.object_prefix);
2290 rbd_dev->header.object_prefix = NULL;
2292 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2301 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2304 __le64 snapid = cpu_to_le64(snap_id);
2308 } features_buf = { 0 };
2312 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2313 "rbd", "get_features",
2314 (char *) &snapid, sizeof (snapid),
2315 (char *) &features_buf, sizeof (features_buf),
2316 CEPH_OSD_FLAG_READ, NULL);
2317 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2321 incompat = le64_to_cpu(features_buf.incompat);
2322 if (incompat & ~RBD_FEATURES_ALL)
2325 *snap_features = le64_to_cpu(features_buf.features);
2327 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2328 (unsigned long long) snap_id,
2329 (unsigned long long) *snap_features,
2330 (unsigned long long) le64_to_cpu(features_buf.incompat));
2335 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2337 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2338 &rbd_dev->header.features);
2341 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2350 struct ceph_snap_context *snapc;
2354 * We'll need room for the seq value (maximum snapshot id),
2355 * snapshot count, and array of that many snapshot ids.
2356 * For now we have a fixed upper limit on the number we're
2357 * prepared to receive.
2359 size = sizeof (__le64) + sizeof (__le32) +
2360 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2361 reply_buf = kzalloc(size, GFP_KERNEL);
2365 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2366 "rbd", "get_snapcontext",
2369 CEPH_OSD_FLAG_READ, ver);
2370 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2376 end = (char *) reply_buf + size;
2377 ceph_decode_64_safe(&p, end, seq, out);
2378 ceph_decode_32_safe(&p, end, snap_count, out);
2381 * Make sure the reported number of snapshot ids wouldn't go
2382 * beyond the end of our buffer. But before checking that,
2383 * make sure the computed size of the snapshot context we
2384 * allocate is representable in a size_t.
2386 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2391 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2394 size = sizeof (struct ceph_snap_context) +
2395 snap_count * sizeof (snapc->snaps[0]);
2396 snapc = kmalloc(size, GFP_KERNEL);
2402 atomic_set(&snapc->nref, 1);
2404 snapc->num_snaps = snap_count;
2405 for (i = 0; i < snap_count; i++)
2406 snapc->snaps[i] = ceph_decode_64(&p);
2408 rbd_dev->header.snapc = snapc;
2410 dout(" snap context seq = %llu, snap_count = %u\n",
2411 (unsigned long long) seq, (unsigned int) snap_count);
2419 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2427 size_t snap_name_len;
2430 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2431 reply_buf = kmalloc(size, GFP_KERNEL);
2433 return ERR_PTR(-ENOMEM);
2435 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2436 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2437 "rbd", "get_snapshot_name",
2438 (char *) &snap_id, sizeof (snap_id),
2440 CEPH_OSD_FLAG_READ, NULL);
2441 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2446 end = (char *) reply_buf + size;
2448 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2450 if (IS_ERR(snap_name)) {
2451 ret = PTR_ERR(snap_name);
2454 dout(" snap_id 0x%016llx snap_name = %s\n",
2455 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2463 return ERR_PTR(ret);
2466 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2467 u64 *snap_size, u64 *snap_features)
2473 snap_id = rbd_dev->header.snapc->snaps[which];
2474 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2476 return ERR_PTR(ret);
2477 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2479 return ERR_PTR(ret);
2481 return rbd_dev_v2_snap_name(rbd_dev, which);
2484 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2485 u64 *snap_size, u64 *snap_features)
2487 if (rbd_dev->image_format == 1)
2488 return rbd_dev_v1_snap_info(rbd_dev, which,
2489 snap_size, snap_features);
2490 if (rbd_dev->image_format == 2)
2491 return rbd_dev_v2_snap_info(rbd_dev, which,
2492 snap_size, snap_features);
2493 return ERR_PTR(-EINVAL);
2496 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2501 down_write(&rbd_dev->header_rwsem);
2503 /* Grab old order first, to see if it changes */
2505 obj_order = rbd_dev->header.obj_order,
2506 ret = rbd_dev_v2_image_size(rbd_dev);
2509 if (rbd_dev->header.obj_order != obj_order) {
2513 rbd_update_mapping_size(rbd_dev);
2515 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2516 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2519 ret = rbd_dev_snaps_update(rbd_dev);
2520 dout("rbd_dev_snaps_update returned %d\n", ret);
2523 ret = rbd_dev_snaps_register(rbd_dev);
2524 dout("rbd_dev_snaps_register returned %d\n", ret);
2526 up_write(&rbd_dev->header_rwsem);
2532 * Scan the rbd device's current snapshot list and compare it to the
2533 * newly-received snapshot context. Remove any existing snapshots
2534 * not present in the new snapshot context. Add a new snapshot for
2535 * any snaphots in the snapshot context not in the current list.
2536 * And verify there are no changes to snapshots we already know
2539 * Assumes the snapshots in the snapshot context are sorted by
2540 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2541 * are also maintained in that order.)
2543 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2545 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2546 const u32 snap_count = snapc->num_snaps;
2547 struct list_head *head = &rbd_dev->snaps;
2548 struct list_head *links = head->next;
2551 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2552 while (index < snap_count || links != head) {
2554 struct rbd_snap *snap;
2557 u64 snap_features = 0;
2559 snap_id = index < snap_count ? snapc->snaps[index]
2561 snap = links != head ? list_entry(links, struct rbd_snap, node)
2563 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2565 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2566 struct list_head *next = links->next;
2568 /* Existing snapshot not in the new snap context */
2570 if (rbd_dev->snap_id == snap->id)
2571 rbd_dev->exists = false;
2572 rbd_remove_snap_dev(snap);
2573 dout("%ssnap id %llu has been removed\n",
2574 rbd_dev->snap_id == snap->id ? "mapped " : "",
2575 (unsigned long long) snap->id);
2577 /* Done with this list entry; advance */
2583 snap_name = rbd_dev_snap_info(rbd_dev, index,
2584 &snap_size, &snap_features);
2585 if (IS_ERR(snap_name))
2586 return PTR_ERR(snap_name);
2588 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2589 (unsigned long long) snap_id);
2590 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2591 struct rbd_snap *new_snap;
2593 /* We haven't seen this snapshot before */
2595 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2596 snap_id, snap_size, snap_features);
2597 if (IS_ERR(new_snap)) {
2598 int err = PTR_ERR(new_snap);
2600 dout(" failed to add dev, error %d\n", err);
2605 /* New goes before existing, or at end of list */
2607 dout(" added dev%s\n", snap ? "" : " at end\n");
2609 list_add_tail(&new_snap->node, &snap->node);
2611 list_add_tail(&new_snap->node, head);
2613 /* Already have this one */
2615 dout(" already present\n");
2617 rbd_assert(snap->size == snap_size);
2618 rbd_assert(!strcmp(snap->name, snap_name));
2619 rbd_assert(snap->features == snap_features);
2621 /* Done with this list entry; advance */
2623 links = links->next;
2626 /* Advance to the next entry in the snapshot context */
2630 dout("%s: done\n", __func__);
2636 * Scan the list of snapshots and register the devices for any that
2637 * have not already been registered.
2639 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2641 struct rbd_snap *snap;
2644 dout("%s called\n", __func__);
2645 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2648 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2649 if (!rbd_snap_registered(snap)) {
2650 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2655 dout("%s: returning %d\n", __func__, ret);
2660 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2665 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2667 dev = &rbd_dev->dev;
2668 dev->bus = &rbd_bus_type;
2669 dev->type = &rbd_device_type;
2670 dev->parent = &rbd_root_dev;
2671 dev->release = rbd_dev_release;
2672 dev_set_name(dev, "%d", rbd_dev->dev_id);
2673 ret = device_register(dev);
2675 mutex_unlock(&ctl_mutex);
2680 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2682 device_unregister(&rbd_dev->dev);
2685 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2690 ret = rbd_req_sync_watch(rbd_dev);
2691 if (ret == -ERANGE) {
2692 rc = rbd_dev_refresh(rbd_dev, NULL);
2696 } while (ret == -ERANGE);
2701 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2704 * Get a unique rbd identifier for the given new rbd_dev, and add
2705 * the rbd_dev to the global list. The minimum rbd id is 1.
2707 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2709 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2711 spin_lock(&rbd_dev_list_lock);
2712 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2713 spin_unlock(&rbd_dev_list_lock);
2714 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2715 (unsigned long long) rbd_dev->dev_id);
2719 * Remove an rbd_dev from the global list, and record that its
2720 * identifier is no longer in use.
2722 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2724 struct list_head *tmp;
2725 int rbd_id = rbd_dev->dev_id;
2728 rbd_assert(rbd_id > 0);
2730 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2731 (unsigned long long) rbd_dev->dev_id);
2732 spin_lock(&rbd_dev_list_lock);
2733 list_del_init(&rbd_dev->node);
2736 * If the id being "put" is not the current maximum, there
2737 * is nothing special we need to do.
2739 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2740 spin_unlock(&rbd_dev_list_lock);
2745 * We need to update the current maximum id. Search the
2746 * list to find out what it is. We're more likely to find
2747 * the maximum at the end, so search the list backward.
2750 list_for_each_prev(tmp, &rbd_dev_list) {
2751 struct rbd_device *rbd_dev;
2753 rbd_dev = list_entry(tmp, struct rbd_device, node);
2754 if (rbd_dev->dev_id > max_id)
2755 max_id = rbd_dev->dev_id;
2757 spin_unlock(&rbd_dev_list_lock);
2760 * The max id could have been updated by rbd_dev_id_get(), in
2761 * which case it now accurately reflects the new maximum.
2762 * Be careful not to overwrite the maximum value in that
2765 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2766 dout(" max dev id has been reset\n");
2770 * Skips over white space at *buf, and updates *buf to point to the
2771 * first found non-space character (if any). Returns the length of
2772 * the token (string of non-white space characters) found. Note
2773 * that *buf must be terminated with '\0'.
2775 static inline size_t next_token(const char **buf)
2778 * These are the characters that produce nonzero for
2779 * isspace() in the "C" and "POSIX" locales.
2781 const char *spaces = " \f\n\r\t\v";
2783 *buf += strspn(*buf, spaces); /* Find start of token */
2785 return strcspn(*buf, spaces); /* Return token length */
2789 * Finds the next token in *buf, and if the provided token buffer is
2790 * big enough, copies the found token into it. The result, if
2791 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2792 * must be terminated with '\0' on entry.
2794 * Returns the length of the token found (not including the '\0').
2795 * Return value will be 0 if no token is found, and it will be >=
2796 * token_size if the token would not fit.
2798 * The *buf pointer will be updated to point beyond the end of the
2799 * found token. Note that this occurs even if the token buffer is
2800 * too small to hold it.
2802 static inline size_t copy_token(const char **buf,
2808 len = next_token(buf);
2809 if (len < token_size) {
2810 memcpy(token, *buf, len);
2811 *(token + len) = '\0';
2819 * Finds the next token in *buf, dynamically allocates a buffer big
2820 * enough to hold a copy of it, and copies the token into the new
2821 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2822 * that a duplicate buffer is created even for a zero-length token.
2824 * Returns a pointer to the newly-allocated duplicate, or a null
2825 * pointer if memory for the duplicate was not available. If
2826 * the lenp argument is a non-null pointer, the length of the token
2827 * (not including the '\0') is returned in *lenp.
2829 * If successful, the *buf pointer will be updated to point beyond
2830 * the end of the found token.
2832 * Note: uses GFP_KERNEL for allocation.
2834 static inline char *dup_token(const char **buf, size_t *lenp)
2839 len = next_token(buf);
2840 dup = kmalloc(len + 1, GFP_KERNEL);
2844 memcpy(dup, *buf, len);
2845 *(dup + len) = '\0';
2855 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2856 * rbd_md_name, and name fields of the given rbd_dev, based on the
2857 * list of monitor addresses and other options provided via
2858 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2859 * copy of the snapshot name to map if successful, or a
2860 * pointer-coded error otherwise.
2862 * Note: rbd_dev is assumed to have been initially zero-filled.
2864 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2866 const char **mon_addrs,
2867 size_t *mon_addrs_size,
2869 size_t options_size)
2872 char *err_ptr = ERR_PTR(-EINVAL);
2875 /* The first four tokens are required */
2877 len = next_token(&buf);
2880 *mon_addrs_size = len + 1;
2885 len = copy_token(&buf, options, options_size);
2886 if (!len || len >= options_size)
2889 err_ptr = ERR_PTR(-ENOMEM);
2890 rbd_dev->pool_name = dup_token(&buf, NULL);
2891 if (!rbd_dev->pool_name)
2894 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2895 if (!rbd_dev->image_name)
2898 /* Snapshot name is optional; default is to use "head" */
2900 len = next_token(&buf);
2901 if (len > RBD_MAX_SNAP_NAME_LEN) {
2902 err_ptr = ERR_PTR(-ENAMETOOLONG);
2906 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2907 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2909 snap_name = kmalloc(len + 1, GFP_KERNEL);
2912 memcpy(snap_name, buf, len);
2913 *(snap_name + len) = '\0';
2918 kfree(rbd_dev->image_name);
2919 rbd_dev->image_name = NULL;
2920 rbd_dev->image_name_len = 0;
2921 kfree(rbd_dev->pool_name);
2922 rbd_dev->pool_name = NULL;
2928 * An rbd format 2 image has a unique identifier, distinct from the
2929 * name given to it by the user. Internally, that identifier is
2930 * what's used to specify the names of objects related to the image.
2932 * A special "rbd id" object is used to map an rbd image name to its
2933 * id. If that object doesn't exist, then there is no v2 rbd image
2934 * with the supplied name.
2936 * This function will record the given rbd_dev's image_id field if
2937 * it can be determined, and in that case will return 0. If any
2938 * errors occur a negative errno will be returned and the rbd_dev's
2939 * image_id field will be unchanged (and should be NULL).
2941 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2950 * First, see if the format 2 image id file exists, and if
2951 * so, get the image's persistent id from it.
2953 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2954 object_name = kmalloc(size, GFP_NOIO);
2957 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2958 dout("rbd id object name is %s\n", object_name);
2960 /* Response will be an encoded string, which includes a length */
2962 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2963 response = kzalloc(size, GFP_NOIO);
2969 ret = rbd_req_sync_exec(rbd_dev, object_name,
2972 response, RBD_IMAGE_ID_LEN_MAX,
2973 CEPH_OSD_FLAG_READ, NULL);
2974 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2977 ret = 0; /* rbd_req_sync_exec() can return positive */
2980 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2981 p + RBD_IMAGE_ID_LEN_MAX,
2982 &rbd_dev->image_id_len,
2984 if (IS_ERR(rbd_dev->image_id)) {
2985 ret = PTR_ERR(rbd_dev->image_id);
2986 rbd_dev->image_id = NULL;
2988 dout("image_id is %s\n", rbd_dev->image_id);
2997 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3002 /* Version 1 images have no id; empty string is used */
3004 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3005 if (!rbd_dev->image_id)
3007 rbd_dev->image_id_len = 0;
3009 /* Record the header object name for this rbd image. */
3011 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3012 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3013 if (!rbd_dev->header_name) {
3017 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3019 /* Populate rbd image metadata */
3021 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3024 rbd_dev->image_format = 1;
3026 dout("discovered version 1 image, header name is %s\n",
3027 rbd_dev->header_name);
3032 kfree(rbd_dev->header_name);
3033 rbd_dev->header_name = NULL;
3034 kfree(rbd_dev->image_id);
3035 rbd_dev->image_id = NULL;
3040 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3047 * Image id was filled in by the caller. Record the header
3048 * object name for this rbd image.
3050 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3051 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3052 if (!rbd_dev->header_name)
3054 sprintf(rbd_dev->header_name, "%s%s",
3055 RBD_HEADER_PREFIX, rbd_dev->image_id);
3057 /* Get the size and object order for the image */
3059 ret = rbd_dev_v2_image_size(rbd_dev);
3063 /* Get the object prefix (a.k.a. block_name) for the image */
3065 ret = rbd_dev_v2_object_prefix(rbd_dev);
3069 /* Get the and check features for the image */
3071 ret = rbd_dev_v2_features(rbd_dev);
3075 /* crypto and compression type aren't (yet) supported for v2 images */
3077 rbd_dev->header.crypt_type = 0;
3078 rbd_dev->header.comp_type = 0;
3080 /* Get the snapshot context, plus the header version */
3082 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3085 rbd_dev->header.obj_version = ver;
3087 rbd_dev->image_format = 2;
3089 dout("discovered version 2 image, header name is %s\n",
3090 rbd_dev->header_name);
3094 kfree(rbd_dev->header_name);
3095 rbd_dev->header_name = NULL;
3096 kfree(rbd_dev->header.object_prefix);
3097 rbd_dev->header.object_prefix = NULL;
3103 * Probe for the existence of the header object for the given rbd
3104 * device. For format 2 images this includes determining the image
3107 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3112 * Get the id from the image id object. If it's not a
3113 * format 2 image, we'll get ENOENT back, and we'll assume
3114 * it's a format 1 image.
3116 ret = rbd_dev_image_id(rbd_dev);
3118 ret = rbd_dev_v1_probe(rbd_dev);
3120 ret = rbd_dev_v2_probe(rbd_dev);
3122 dout("probe failed, returning %d\n", ret);
3127 static ssize_t rbd_add(struct bus_type *bus,
3132 struct rbd_device *rbd_dev = NULL;
3133 const char *mon_addrs = NULL;
3134 size_t mon_addrs_size = 0;
3135 struct ceph_osd_client *osdc;
3139 if (!try_module_get(THIS_MODULE))
3142 options = kmalloc(count, GFP_KERNEL);
3145 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3149 /* static rbd_device initialization */
3150 spin_lock_init(&rbd_dev->lock);
3151 INIT_LIST_HEAD(&rbd_dev->node);
3152 INIT_LIST_HEAD(&rbd_dev->snaps);
3153 init_rwsem(&rbd_dev->header_rwsem);
3155 /* parse add command */
3156 snap_name = rbd_add_parse_args(rbd_dev, buf,
3157 &mon_addrs, &mon_addrs_size, options, count);
3158 if (IS_ERR(snap_name)) {
3159 rc = PTR_ERR(snap_name);
3163 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3168 osdc = &rbd_dev->rbd_client->client->osdc;
3169 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3171 goto err_out_client;
3172 rbd_dev->pool_id = (u64) rc;
3174 rc = rbd_dev_probe(rbd_dev);
3176 goto err_out_client;
3178 /* no need to lock here, as rbd_dev is not registered yet */
3179 rc = rbd_dev_snaps_update(rbd_dev);
3183 rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3187 /* generate unique id: find highest unique id, add one */
3188 rbd_dev_id_get(rbd_dev);
3190 /* Fill in the device name, now that we have its id. */
3191 BUILD_BUG_ON(DEV_NAME_LEN
3192 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3193 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3195 /* Get our block major device number. */
3197 rc = register_blkdev(0, rbd_dev->name);
3200 rbd_dev->major = rc;
3202 /* Set up the blkdev mapping. */
3204 rc = rbd_init_disk(rbd_dev);
3206 goto err_out_blkdev;
3208 rc = rbd_bus_add_dev(rbd_dev);
3213 * At this point cleanup in the event of an error is the job
3214 * of the sysfs code (initiated by rbd_bus_del_dev()).
3217 down_write(&rbd_dev->header_rwsem);
3218 rc = rbd_dev_snaps_register(rbd_dev);
3219 up_write(&rbd_dev->header_rwsem);
3223 rc = rbd_init_watch_dev(rbd_dev);
3227 /* Everything's ready. Announce the disk to the world. */
3229 add_disk(rbd_dev->disk);
3231 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3232 (unsigned long long) rbd_dev->mapping.size);
3237 /* this will also clean up rest of rbd_dev stuff */
3239 rbd_bus_del_dev(rbd_dev);
3244 rbd_free_disk(rbd_dev);
3246 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3248 rbd_dev_id_put(rbd_dev);
3250 rbd_remove_all_snaps(rbd_dev);
3252 rbd_header_free(&rbd_dev->header);
3254 kfree(rbd_dev->header_name);
3255 rbd_put_client(rbd_dev);
3256 kfree(rbd_dev->image_id);
3258 kfree(rbd_dev->snap_name);
3259 kfree(rbd_dev->image_name);
3260 kfree(rbd_dev->pool_name);
3265 dout("Error adding device %s\n", buf);
3266 module_put(THIS_MODULE);
3268 return (ssize_t) rc;
3271 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3273 struct list_head *tmp;
3274 struct rbd_device *rbd_dev;
3276 spin_lock(&rbd_dev_list_lock);
3277 list_for_each(tmp, &rbd_dev_list) {
3278 rbd_dev = list_entry(tmp, struct rbd_device, node);
3279 if (rbd_dev->dev_id == dev_id) {
3280 spin_unlock(&rbd_dev_list_lock);
3284 spin_unlock(&rbd_dev_list_lock);
3288 static void rbd_dev_release(struct device *dev)
3290 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3292 if (rbd_dev->watch_request) {
3293 struct ceph_client *client = rbd_dev->rbd_client->client;
3295 ceph_osdc_unregister_linger_request(&client->osdc,
3296 rbd_dev->watch_request);
3298 if (rbd_dev->watch_event)
3299 rbd_req_sync_unwatch(rbd_dev);
3301 rbd_put_client(rbd_dev);
3303 /* clean up and free blkdev */
3304 rbd_free_disk(rbd_dev);
3305 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3307 /* release allocated disk header fields */
3308 rbd_header_free(&rbd_dev->header);
3310 /* done with the id, and with the rbd_dev */
3311 kfree(rbd_dev->snap_name);
3312 kfree(rbd_dev->image_id);
3313 kfree(rbd_dev->header_name);
3314 kfree(rbd_dev->pool_name);
3315 kfree(rbd_dev->image_name);
3316 rbd_dev_id_put(rbd_dev);
3319 /* release module ref */
3320 module_put(THIS_MODULE);
3323 static ssize_t rbd_remove(struct bus_type *bus,
3327 struct rbd_device *rbd_dev = NULL;
3332 rc = strict_strtoul(buf, 10, &ul);
3336 /* convert to int; abort if we lost anything in the conversion */
3337 target_id = (int) ul;
3338 if (target_id != ul)
3341 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3343 rbd_dev = __rbd_get_dev(target_id);
3349 rbd_remove_all_snaps(rbd_dev);
3350 rbd_bus_del_dev(rbd_dev);
3353 mutex_unlock(&ctl_mutex);
3359 * create control files in sysfs
3362 static int rbd_sysfs_init(void)
3366 ret = device_register(&rbd_root_dev);
3370 ret = bus_register(&rbd_bus_type);
3372 device_unregister(&rbd_root_dev);
3377 static void rbd_sysfs_cleanup(void)
3379 bus_unregister(&rbd_bus_type);
3380 device_unregister(&rbd_root_dev);
3383 int __init rbd_init(void)
3387 rc = rbd_sysfs_init();
3390 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3394 void __exit rbd_exit(void)
3396 rbd_sysfs_cleanup();
3399 module_init(rbd_init);
3400 module_exit(rbd_exit);
3402 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3403 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3404 MODULE_DESCRIPTION("rados block device");
3406 /* following authorship retained from original osdblk.c */
3407 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3409 MODULE_LICENSE("GPL");