2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
71 #define RBD_SNAP_HEAD_NAME "-"
73 #define RBD_IMAGE_ID_LEN_MAX 64
74 #define RBD_OBJ_PREFIX_LEN_MAX 64
78 #define RBD_FEATURE_LAYERING 1
80 /* Features supported by this (client software) implementation. */
82 #define RBD_FEATURES_ALL (0)
85 * An RBD device name will be "rbd#", where the "rbd" comes from
86 * RBD_DRV_NAME above, and # is a unique integer identifier.
87 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88 * enough to hold all possible device names.
90 #define DEV_NAME_LEN 32
91 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
93 #define RBD_READ_ONLY_DEFAULT false
96 * block device image metadata (in-memory version)
98 struct rbd_image_header {
99 /* These four fields never change for a given rbd image */
106 /* The remaining fields need to be updated occasionally */
108 struct ceph_snap_context *snapc;
120 * an instance of the client. multiple devices may share an rbd client.
123 struct ceph_client *client;
125 struct list_head node;
129 * a request completion status
131 struct rbd_req_status {
138 * a collection of requests
140 struct rbd_req_coll {
144 struct rbd_req_status status[0];
148 * a single io request
151 struct request *rq; /* blk layer request */
152 struct bio *bio; /* cloned bio */
153 struct page **pages; /* list of used pages */
156 struct rbd_req_coll *coll;
163 struct list_head node;
181 int dev_id; /* blkdev unique id */
183 int major; /* blkdev assigned major */
184 struct gendisk *disk; /* blkdev's gendisk and rq */
186 u32 image_format; /* Either 1 or 2 */
187 struct rbd_options rbd_opts;
188 struct rbd_client *rbd_client;
190 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
192 spinlock_t lock; /* queue lock */
194 struct rbd_image_header header;
198 size_t image_name_len;
203 struct ceph_osd_event *watch_event;
204 struct ceph_osd_request *watch_request;
206 /* protects updating the header */
207 struct rw_semaphore header_rwsem;
209 struct rbd_mapping mapping;
211 struct list_head node;
213 /* list of snapshots */
214 struct list_head snaps;
220 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
222 static LIST_HEAD(rbd_dev_list); /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
225 static LIST_HEAD(rbd_client_list); /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
231 static void rbd_dev_release(struct device *dev);
232 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
239 static struct bus_attribute rbd_bus_attrs[] = {
240 __ATTR(add, S_IWUSR, NULL, rbd_add),
241 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
245 static struct bus_type rbd_bus_type = {
247 .bus_attrs = rbd_bus_attrs,
250 static void rbd_root_dev_release(struct device *dev)
254 static struct device rbd_root_dev = {
256 .release = rbd_root_dev_release,
260 #define rbd_assert(expr) \
261 if (unlikely(!(expr))) { \
262 printk(KERN_ERR "\nAssertion failure in %s() " \
264 "\trbd_assert(%s);\n\n", \
265 __func__, __LINE__, #expr); \
268 #else /* !RBD_DEBUG */
269 # define rbd_assert(expr) ((void) 0)
270 #endif /* !RBD_DEBUG */
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
274 return get_device(&rbd_dev->dev);
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
279 put_device(&rbd_dev->dev);
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
287 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
289 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
292 rbd_get_dev(rbd_dev);
293 set_device_ro(bdev, rbd_dev->mapping.read_only);
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
300 struct rbd_device *rbd_dev = disk->private_data;
302 rbd_put_dev(rbd_dev);
307 static const struct block_device_operations rbd_bd_ops = {
308 .owner = THIS_MODULE,
310 .release = rbd_release,
314 * Initialize an rbd client instance.
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
319 struct rbd_client *rbdc;
322 dout("rbd_client_create\n");
323 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
327 kref_init(&rbdc->kref);
328 INIT_LIST_HEAD(&rbdc->node);
330 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
332 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333 if (IS_ERR(rbdc->client))
335 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
337 ret = ceph_open_session(rbdc->client);
341 spin_lock(&rbd_client_list_lock);
342 list_add_tail(&rbdc->node, &rbd_client_list);
343 spin_unlock(&rbd_client_list_lock);
345 mutex_unlock(&ctl_mutex);
347 dout("rbd_client_create created %p\n", rbdc);
351 ceph_destroy_client(rbdc->client);
353 mutex_unlock(&ctl_mutex);
357 ceph_destroy_options(ceph_opts);
362 * Find a ceph client with specific addr and configuration. If
363 * found, bump its reference count.
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
367 struct rbd_client *client_node;
370 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
373 spin_lock(&rbd_client_list_lock);
374 list_for_each_entry(client_node, &rbd_client_list, node) {
375 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376 kref_get(&client_node->kref);
381 spin_unlock(&rbd_client_list_lock);
383 return found ? client_node : NULL;
393 /* string args above */
396 /* Boolean args above */
400 static match_table_t rbd_opts_tokens = {
402 /* string args above */
403 {Opt_read_only, "read_only"},
404 {Opt_read_only, "ro"}, /* Alternate spelling */
405 {Opt_read_write, "read_write"},
406 {Opt_read_write, "rw"}, /* Alternate spelling */
407 /* Boolean args above */
411 static int parse_rbd_opts_token(char *c, void *private)
413 struct rbd_options *rbd_opts = private;
414 substring_t argstr[MAX_OPT_ARGS];
415 int token, intval, ret;
417 token = match_token(c, rbd_opts_tokens, argstr);
421 if (token < Opt_last_int) {
422 ret = match_int(&argstr[0], &intval);
424 pr_err("bad mount option arg (not int) "
428 dout("got int token %d val %d\n", token, intval);
429 } else if (token > Opt_last_int && token < Opt_last_string) {
430 dout("got string token %d val %s\n", token,
432 } else if (token > Opt_last_string && token < Opt_last_bool) {
433 dout("got Boolean token %d\n", token);
435 dout("got token %d\n", token);
440 rbd_opts->read_only = true;
443 rbd_opts->read_only = false;
453 * Get a ceph client with specific addr and configuration, if one does
454 * not exist create it.
456 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
457 size_t mon_addr_len, char *options)
459 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
460 struct ceph_options *ceph_opts;
461 struct rbd_client *rbdc;
463 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
465 ceph_opts = ceph_parse_options(options, mon_addr,
466 mon_addr + mon_addr_len,
467 parse_rbd_opts_token, rbd_opts);
468 if (IS_ERR(ceph_opts))
469 return PTR_ERR(ceph_opts);
471 rbdc = rbd_client_find(ceph_opts);
473 /* using an existing client */
474 ceph_destroy_options(ceph_opts);
476 rbdc = rbd_client_create(ceph_opts);
478 return PTR_ERR(rbdc);
480 rbd_dev->rbd_client = rbdc;
486 * Destroy ceph client
488 * Caller must hold rbd_client_list_lock.
490 static void rbd_client_release(struct kref *kref)
492 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
494 dout("rbd_release_client %p\n", rbdc);
495 spin_lock(&rbd_client_list_lock);
496 list_del(&rbdc->node);
497 spin_unlock(&rbd_client_list_lock);
499 ceph_destroy_client(rbdc->client);
504 * Drop reference to ceph client node. If it's not referenced anymore, release
507 static void rbd_put_client(struct rbd_device *rbd_dev)
509 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
510 rbd_dev->rbd_client = NULL;
514 * Destroy requests collection
516 static void rbd_coll_release(struct kref *kref)
518 struct rbd_req_coll *coll =
519 container_of(kref, struct rbd_req_coll, kref);
521 dout("rbd_coll_release %p\n", coll);
525 static bool rbd_image_format_valid(u32 image_format)
527 return image_format == 1 || image_format == 2;
530 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
535 /* The header has to start with the magic rbd header text */
536 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
539 /* The bio layer requires at least sector-sized I/O */
541 if (ondisk->options.order < SECTOR_SHIFT)
544 /* If we use u64 in a few spots we may be able to loosen this */
546 if (ondisk->options.order > 8 * sizeof (int) - 1)
550 * The size of a snapshot header has to fit in a size_t, and
551 * that limits the number of snapshots.
553 snap_count = le32_to_cpu(ondisk->snap_count);
554 size = SIZE_MAX - sizeof (struct ceph_snap_context);
555 if (snap_count > size / sizeof (__le64))
559 * Not only that, but the size of the entire the snapshot
560 * header must also be representable in a size_t.
562 size -= snap_count * sizeof (__le64);
563 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
570 * Create a new header structure, translate header format from the on-disk
573 static int rbd_header_from_disk(struct rbd_image_header *header,
574 struct rbd_image_header_ondisk *ondisk)
581 memset(header, 0, sizeof (*header));
583 snap_count = le32_to_cpu(ondisk->snap_count);
585 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
586 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
587 if (!header->object_prefix)
589 memcpy(header->object_prefix, ondisk->object_prefix, len);
590 header->object_prefix[len] = '\0';
593 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
595 /* Save a copy of the snapshot names */
597 if (snap_names_len > (u64) SIZE_MAX)
599 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
600 if (!header->snap_names)
603 * Note that rbd_dev_v1_header_read() guarantees
604 * the ondisk buffer we're working with has
605 * snap_names_len bytes beyond the end of the
606 * snapshot id array, this memcpy() is safe.
608 memcpy(header->snap_names, &ondisk->snaps[snap_count],
611 /* Record each snapshot's size */
613 size = snap_count * sizeof (*header->snap_sizes);
614 header->snap_sizes = kmalloc(size, GFP_KERNEL);
615 if (!header->snap_sizes)
617 for (i = 0; i < snap_count; i++)
618 header->snap_sizes[i] =
619 le64_to_cpu(ondisk->snaps[i].image_size);
621 WARN_ON(ondisk->snap_names_len);
622 header->snap_names = NULL;
623 header->snap_sizes = NULL;
626 header->features = 0; /* No features support in v1 images */
627 header->obj_order = ondisk->options.order;
628 header->crypt_type = ondisk->options.crypt_type;
629 header->comp_type = ondisk->options.comp_type;
631 /* Allocate and fill in the snapshot context */
633 header->image_size = le64_to_cpu(ondisk->image_size);
634 size = sizeof (struct ceph_snap_context);
635 size += snap_count * sizeof (header->snapc->snaps[0]);
636 header->snapc = kzalloc(size, GFP_KERNEL);
640 atomic_set(&header->snapc->nref, 1);
641 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
642 header->snapc->num_snaps = snap_count;
643 for (i = 0; i < snap_count; i++)
644 header->snapc->snaps[i] =
645 le64_to_cpu(ondisk->snaps[i].id);
650 kfree(header->snap_sizes);
651 header->snap_sizes = NULL;
652 kfree(header->snap_names);
653 header->snap_names = NULL;
654 kfree(header->object_prefix);
655 header->object_prefix = NULL;
660 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
663 struct rbd_snap *snap;
665 list_for_each_entry(snap, &rbd_dev->snaps, node) {
666 if (!strcmp(snap_name, snap->name)) {
667 rbd_dev->mapping.snap_id = snap->id;
668 rbd_dev->mapping.size = snap->size;
669 rbd_dev->mapping.features = snap->features;
678 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
682 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
683 sizeof (RBD_SNAP_HEAD_NAME))) {
684 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
685 rbd_dev->mapping.size = rbd_dev->header.image_size;
686 rbd_dev->mapping.features = rbd_dev->header.features;
687 rbd_dev->mapping.snap_exists = false;
688 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
691 ret = snap_by_name(rbd_dev, snap_name);
694 rbd_dev->mapping.snap_exists = true;
695 rbd_dev->mapping.read_only = true;
697 rbd_dev->mapping.snap_name = snap_name;
702 static void rbd_header_free(struct rbd_image_header *header)
704 kfree(header->object_prefix);
705 header->object_prefix = NULL;
706 kfree(header->snap_sizes);
707 header->snap_sizes = NULL;
708 kfree(header->snap_names);
709 header->snap_names = NULL;
710 ceph_put_snap_context(header->snapc);
711 header->snapc = NULL;
714 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
720 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
723 segment = offset >> rbd_dev->header.obj_order;
724 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
725 rbd_dev->header.object_prefix, segment);
726 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
727 pr_err("error formatting segment name for #%llu (%d)\n",
736 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
738 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
740 return offset & (segment_size - 1);
743 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
744 u64 offset, u64 length)
746 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
748 offset &= segment_size - 1;
750 rbd_assert(length <= U64_MAX - offset);
751 if (offset + length > segment_size)
752 length = segment_size - offset;
757 static int rbd_get_num_segments(struct rbd_image_header *header,
765 if (len - 1 > U64_MAX - ofs)
768 start_seg = ofs >> header->obj_order;
769 end_seg = (ofs + len - 1) >> header->obj_order;
771 return end_seg - start_seg + 1;
775 * returns the size of an object in the image
777 static u64 rbd_obj_bytes(struct rbd_image_header *header)
779 return 1 << header->obj_order;
786 static void bio_chain_put(struct bio *chain)
792 chain = chain->bi_next;
798 * zeros a bio chain, starting at specific offset
800 static void zero_bio_chain(struct bio *chain, int start_ofs)
809 bio_for_each_segment(bv, chain, i) {
810 if (pos + bv->bv_len > start_ofs) {
811 int remainder = max(start_ofs - pos, 0);
812 buf = bvec_kmap_irq(bv, &flags);
813 memset(buf + remainder, 0,
814 bv->bv_len - remainder);
815 bvec_kunmap_irq(buf, &flags);
820 chain = chain->bi_next;
825 * bio_chain_clone - clone a chain of bios up to a certain length.
826 * might return a bio_pair that will need to be released.
828 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
829 struct bio_pair **bp,
830 int len, gfp_t gfpmask)
832 struct bio *old_chain = *old;
833 struct bio *new_chain = NULL;
838 bio_pair_release(*bp);
842 while (old_chain && (total < len)) {
845 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
848 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
850 if (total + old_chain->bi_size > len) {
854 * this split can only happen with a single paged bio,
855 * split_bio will BUG_ON if this is not the case
857 dout("bio_chain_clone split! total=%d remaining=%d"
859 total, len - total, old_chain->bi_size);
861 /* split the bio. We'll release it either in the next
862 call, or it will have to be released outside */
863 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
867 __bio_clone(tmp, &bp->bio1);
871 __bio_clone(tmp, old_chain);
872 *next = old_chain->bi_next;
882 old_chain = old_chain->bi_next;
884 total += tmp->bi_size;
887 rbd_assert(total == len);
894 dout("bio_chain_clone with err\n");
895 bio_chain_put(new_chain);
900 * helpers for osd request op vectors.
902 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
903 int opcode, u32 payload_len)
905 struct ceph_osd_req_op *ops;
907 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
914 * op extent offset and length will be set later on
915 * in calc_raw_layout()
917 ops[0].payload_len = payload_len;
922 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
927 static void rbd_coll_end_req_index(struct request *rq,
928 struct rbd_req_coll *coll,
932 struct request_queue *q;
935 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
936 coll, index, ret, (unsigned long long) len);
942 blk_end_request(rq, ret, len);
948 spin_lock_irq(q->queue_lock);
949 coll->status[index].done = 1;
950 coll->status[index].rc = ret;
951 coll->status[index].bytes = len;
952 max = min = coll->num_done;
953 while (max < coll->total && coll->status[max].done)
956 for (i = min; i<max; i++) {
957 __blk_end_request(rq, coll->status[i].rc,
958 coll->status[i].bytes);
960 kref_put(&coll->kref, rbd_coll_release);
962 spin_unlock_irq(q->queue_lock);
965 static void rbd_coll_end_req(struct rbd_request *req,
968 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
972 * Send ceph osd request
974 static int rbd_do_request(struct request *rq,
975 struct rbd_device *rbd_dev,
976 struct ceph_snap_context *snapc,
978 const char *object_name, u64 ofs, u64 len,
983 struct ceph_osd_req_op *ops,
984 struct rbd_req_coll *coll,
986 void (*rbd_cb)(struct ceph_osd_request *req,
987 struct ceph_msg *msg),
988 struct ceph_osd_request **linger_req,
991 struct ceph_osd_request *req;
992 struct ceph_file_layout *layout;
995 struct timespec mtime = CURRENT_TIME;
996 struct rbd_request *req_data;
997 struct ceph_osd_request_head *reqhead;
998 struct ceph_osd_client *osdc;
1000 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1003 rbd_coll_end_req_index(rq, coll, coll_index,
1009 req_data->coll = coll;
1010 req_data->coll_index = coll_index;
1013 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
1014 (unsigned long long) ofs, (unsigned long long) len);
1016 osdc = &rbd_dev->rbd_client->client->osdc;
1017 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1018 false, GFP_NOIO, pages, bio);
1024 req->r_callback = rbd_cb;
1027 req_data->bio = bio;
1028 req_data->pages = pages;
1029 req_data->len = len;
1031 req->r_priv = req_data;
1033 reqhead = req->r_request->front.iov_base;
1034 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1036 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1037 req->r_oid_len = strlen(req->r_oid);
1039 layout = &req->r_file_layout;
1040 memset(layout, 0, sizeof(*layout));
1041 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1042 layout->fl_stripe_count = cpu_to_le32(1);
1043 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1044 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1045 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1047 rbd_assert(ret == 0);
1049 ceph_osdc_build_request(req, ofs, &len,
1053 req->r_oid, req->r_oid_len);
1056 ceph_osdc_set_request_linger(osdc, req);
1060 ret = ceph_osdc_start_request(osdc, req, false);
1065 ret = ceph_osdc_wait_request(osdc, req);
1067 *ver = le64_to_cpu(req->r_reassert_version.version);
1068 dout("reassert_ver=%llu\n",
1069 (unsigned long long)
1070 le64_to_cpu(req->r_reassert_version.version));
1071 ceph_osdc_put_request(req);
1076 bio_chain_put(req_data->bio);
1077 ceph_osdc_put_request(req);
1079 rbd_coll_end_req(req_data, ret, len);
1085 * Ceph osd op callback
1087 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1089 struct rbd_request *req_data = req->r_priv;
1090 struct ceph_osd_reply_head *replyhead;
1091 struct ceph_osd_op *op;
1097 replyhead = msg->front.iov_base;
1098 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1099 op = (void *)(replyhead + 1);
1100 rc = le32_to_cpu(replyhead->result);
1101 bytes = le64_to_cpu(op->extent.length);
1102 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1104 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1105 (unsigned long long) bytes, read_op, (int) rc);
1107 if (rc == -ENOENT && read_op) {
1108 zero_bio_chain(req_data->bio, 0);
1110 } else if (rc == 0 && read_op && bytes < req_data->len) {
1111 zero_bio_chain(req_data->bio, bytes);
1112 bytes = req_data->len;
1115 rbd_coll_end_req(req_data, rc, bytes);
1118 bio_chain_put(req_data->bio);
1120 ceph_osdc_put_request(req);
1124 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1126 ceph_osdc_put_request(req);
1130 * Do a synchronous ceph osd operation
1132 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1133 struct ceph_snap_context *snapc,
1136 struct ceph_osd_req_op *ops,
1137 const char *object_name,
1138 u64 ofs, u64 inbound_size,
1140 struct ceph_osd_request **linger_req,
1144 struct page **pages;
1147 rbd_assert(ops != NULL);
1149 num_pages = calc_pages_for(ofs, inbound_size);
1150 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1152 return PTR_ERR(pages);
1154 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1155 object_name, ofs, inbound_size, NULL,
1165 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1166 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1169 ceph_release_page_vector(pages, num_pages);
1174 * Do an asynchronous ceph osd operation
1176 static int rbd_do_op(struct request *rq,
1177 struct rbd_device *rbd_dev,
1178 struct ceph_snap_context *snapc,
1181 struct rbd_req_coll *coll,
1188 struct ceph_osd_req_op *ops;
1194 seg_name = rbd_segment_name(rbd_dev, ofs);
1197 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1198 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1200 if (rq_data_dir(rq) == WRITE) {
1201 opcode = CEPH_OSD_OP_WRITE;
1202 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1203 snapid = CEPH_NOSNAP;
1204 payload_len = seg_len;
1206 opcode = CEPH_OSD_OP_READ;
1207 flags = CEPH_OSD_FLAG_READ;
1209 snapid = rbd_dev->mapping.snap_id;
1214 ops = rbd_create_rw_ops(1, opcode, payload_len);
1218 /* we've taken care of segment sizes earlier when we
1219 cloned the bios. We should never have a segment
1220 truncated at this point */
1221 rbd_assert(seg_len == len);
1223 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1224 seg_name, seg_ofs, seg_len,
1230 rbd_req_cb, 0, NULL);
1232 rbd_destroy_ops(ops);
1239 * Request sync osd read
1241 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1243 const char *object_name,
1248 struct ceph_osd_req_op *ops;
1251 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1255 ret = rbd_req_sync_op(rbd_dev, NULL,
1258 ops, object_name, ofs, len, buf, NULL, ver);
1259 rbd_destroy_ops(ops);
1265 * Request sync osd watch
1267 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1271 struct ceph_osd_req_op *ops;
1274 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1278 ops[0].watch.ver = cpu_to_le64(ver);
1279 ops[0].watch.cookie = notify_id;
1280 ops[0].watch.flag = 0;
1282 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1283 rbd_dev->header_name, 0, 0, NULL,
1288 rbd_simple_req_cb, 0, NULL);
1290 rbd_destroy_ops(ops);
1294 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1296 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1303 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1304 rbd_dev->header_name, (unsigned long long) notify_id,
1305 (unsigned int) opcode);
1306 rc = rbd_dev_refresh(rbd_dev, &hver);
1308 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1309 " update snaps: %d\n", rbd_dev->major, rc);
1311 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1315 * Request sync osd watch
1317 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1319 struct ceph_osd_req_op *ops;
1320 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1323 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1327 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1328 (void *)rbd_dev, &rbd_dev->watch_event);
1332 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1333 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1334 ops[0].watch.flag = 1;
1336 ret = rbd_req_sync_op(rbd_dev, NULL,
1338 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1340 rbd_dev->header_name,
1342 &rbd_dev->watch_request, NULL);
1347 rbd_destroy_ops(ops);
1351 ceph_osdc_cancel_event(rbd_dev->watch_event);
1352 rbd_dev->watch_event = NULL;
1354 rbd_destroy_ops(ops);
1359 * Request sync osd unwatch
1361 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1363 struct ceph_osd_req_op *ops;
1366 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1370 ops[0].watch.ver = 0;
1371 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1372 ops[0].watch.flag = 0;
1374 ret = rbd_req_sync_op(rbd_dev, NULL,
1376 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1378 rbd_dev->header_name,
1379 0, 0, NULL, NULL, NULL);
1382 rbd_destroy_ops(ops);
1383 ceph_osdc_cancel_event(rbd_dev->watch_event);
1384 rbd_dev->watch_event = NULL;
1389 * Synchronous osd object method call
1391 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1392 const char *object_name,
1393 const char *class_name,
1394 const char *method_name,
1395 const char *outbound,
1396 size_t outbound_size,
1398 size_t inbound_size,
1402 struct ceph_osd_req_op *ops;
1403 int class_name_len = strlen(class_name);
1404 int method_name_len = strlen(method_name);
1409 * Any input parameters required by the method we're calling
1410 * will be sent along with the class and method names as
1411 * part of the message payload. That data and its size are
1412 * supplied via the indata and indata_len fields (named from
1413 * the perspective of the server side) in the OSD request
1416 payload_size = class_name_len + method_name_len + outbound_size;
1417 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1421 ops[0].cls.class_name = class_name;
1422 ops[0].cls.class_len = (__u8) class_name_len;
1423 ops[0].cls.method_name = method_name;
1424 ops[0].cls.method_len = (__u8) method_name_len;
1425 ops[0].cls.argc = 0;
1426 ops[0].cls.indata = outbound;
1427 ops[0].cls.indata_len = outbound_size;
1429 ret = rbd_req_sync_op(rbd_dev, NULL,
1432 object_name, 0, inbound_size, inbound,
1435 rbd_destroy_ops(ops);
1437 dout("cls_exec returned %d\n", ret);
1441 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1443 struct rbd_req_coll *coll =
1444 kzalloc(sizeof(struct rbd_req_coll) +
1445 sizeof(struct rbd_req_status) * num_reqs,
1450 coll->total = num_reqs;
1451 kref_init(&coll->kref);
1456 * block device queue callback
1458 static void rbd_rq_fn(struct request_queue *q)
1460 struct rbd_device *rbd_dev = q->queuedata;
1462 struct bio_pair *bp = NULL;
1464 while ((rq = blk_fetch_request(q))) {
1466 struct bio *rq_bio, *next_bio = NULL;
1471 int num_segs, cur_seg = 0;
1472 struct rbd_req_coll *coll;
1473 struct ceph_snap_context *snapc;
1475 dout("fetched request\n");
1477 /* filter out block requests we don't understand */
1478 if ((rq->cmd_type != REQ_TYPE_FS)) {
1479 __blk_end_request_all(rq, 0);
1483 /* deduce our operation (read, write) */
1484 do_write = (rq_data_dir(rq) == WRITE);
1486 size = blk_rq_bytes(rq);
1487 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1489 if (do_write && rbd_dev->mapping.read_only) {
1490 __blk_end_request_all(rq, -EROFS);
1494 spin_unlock_irq(q->queue_lock);
1496 down_read(&rbd_dev->header_rwsem);
1498 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1499 !rbd_dev->mapping.snap_exists) {
1500 up_read(&rbd_dev->header_rwsem);
1501 dout("request for non-existent snapshot");
1502 spin_lock_irq(q->queue_lock);
1503 __blk_end_request_all(rq, -ENXIO);
1507 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1509 up_read(&rbd_dev->header_rwsem);
1511 dout("%s 0x%x bytes at 0x%llx\n",
1512 do_write ? "write" : "read",
1513 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1515 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1516 if (num_segs <= 0) {
1517 spin_lock_irq(q->queue_lock);
1518 __blk_end_request_all(rq, num_segs);
1519 ceph_put_snap_context(snapc);
1522 coll = rbd_alloc_coll(num_segs);
1524 spin_lock_irq(q->queue_lock);
1525 __blk_end_request_all(rq, -ENOMEM);
1526 ceph_put_snap_context(snapc);
1531 /* a bio clone to be passed down to OSD req */
1532 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1533 op_size = rbd_segment_length(rbd_dev, ofs, size);
1534 kref_get(&coll->kref);
1535 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1536 op_size, GFP_ATOMIC);
1538 (void) rbd_do_op(rq, rbd_dev, snapc,
1540 bio, coll, cur_seg);
1542 rbd_coll_end_req_index(rq, coll, cur_seg,
1550 kref_put(&coll->kref, rbd_coll_release);
1553 bio_pair_release(bp);
1554 spin_lock_irq(q->queue_lock);
1556 ceph_put_snap_context(snapc);
1561 * a queue callback. Makes sure that we don't create a bio that spans across
1562 * multiple osd objects. One exception would be with a single page bios,
1563 * which we handle later at bio_chain_clone
1565 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1566 struct bio_vec *bvec)
1568 struct rbd_device *rbd_dev = q->queuedata;
1569 unsigned int chunk_sectors;
1571 unsigned int bio_sectors;
1574 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1575 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1576 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1578 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1579 + bio_sectors)) << SECTOR_SHIFT;
1581 max = 0; /* bio_add cannot handle a negative return */
1582 if (max <= bvec->bv_len && bio_sectors == 0)
1583 return bvec->bv_len;
1587 static void rbd_free_disk(struct rbd_device *rbd_dev)
1589 struct gendisk *disk = rbd_dev->disk;
1594 if (disk->flags & GENHD_FL_UP)
1597 blk_cleanup_queue(disk->queue);
1602 * Read the complete header for the given rbd device.
1604 * Returns a pointer to a dynamically-allocated buffer containing
1605 * the complete and validated header. Caller can pass the address
1606 * of a variable that will be filled in with the version of the
1607 * header object at the time it was read.
1609 * Returns a pointer-coded errno if a failure occurs.
1611 static struct rbd_image_header_ondisk *
1612 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1614 struct rbd_image_header_ondisk *ondisk = NULL;
1621 * The complete header will include an array of its 64-bit
1622 * snapshot ids, followed by the names of those snapshots as
1623 * a contiguous block of NUL-terminated strings. Note that
1624 * the number of snapshots could change by the time we read
1625 * it in, in which case we re-read it.
1632 size = sizeof (*ondisk);
1633 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1635 ondisk = kmalloc(size, GFP_KERNEL);
1637 return ERR_PTR(-ENOMEM);
1639 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1640 rbd_dev->header_name,
1642 (char *) ondisk, version);
1646 if (WARN_ON((size_t) ret < size)) {
1648 pr_warning("short header read for image %s"
1649 " (want %zd got %d)\n",
1650 rbd_dev->image_name, size, ret);
1653 if (!rbd_dev_ondisk_valid(ondisk)) {
1655 pr_warning("invalid header for image %s\n",
1656 rbd_dev->image_name);
1660 names_size = le64_to_cpu(ondisk->snap_names_len);
1661 want_count = snap_count;
1662 snap_count = le32_to_cpu(ondisk->snap_count);
1663 } while (snap_count != want_count);
1670 return ERR_PTR(ret);
1674 * reload the ondisk the header
1676 static int rbd_read_header(struct rbd_device *rbd_dev,
1677 struct rbd_image_header *header)
1679 struct rbd_image_header_ondisk *ondisk;
1683 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1685 return PTR_ERR(ondisk);
1686 ret = rbd_header_from_disk(header, ondisk);
1688 header->obj_version = ver;
1694 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1696 struct rbd_snap *snap;
1697 struct rbd_snap *next;
1699 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1700 __rbd_remove_snap_dev(snap);
1703 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1707 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1710 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1711 dout("setting size to %llu sectors", (unsigned long long) size);
1712 rbd_dev->mapping.size = (u64) size;
1713 set_capacity(rbd_dev->disk, size);
1717 * only read the first part of the ondisk header, without the snaps info
1719 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1722 struct rbd_image_header h;
1724 ret = rbd_read_header(rbd_dev, &h);
1728 down_write(&rbd_dev->header_rwsem);
1730 /* Update image size, and check for resize of mapped image */
1731 rbd_dev->header.image_size = h.image_size;
1732 rbd_update_mapping_size(rbd_dev);
1734 /* rbd_dev->header.object_prefix shouldn't change */
1735 kfree(rbd_dev->header.snap_sizes);
1736 kfree(rbd_dev->header.snap_names);
1737 /* osd requests may still refer to snapc */
1738 ceph_put_snap_context(rbd_dev->header.snapc);
1741 *hver = h.obj_version;
1742 rbd_dev->header.obj_version = h.obj_version;
1743 rbd_dev->header.image_size = h.image_size;
1744 rbd_dev->header.snapc = h.snapc;
1745 rbd_dev->header.snap_names = h.snap_names;
1746 rbd_dev->header.snap_sizes = h.snap_sizes;
1747 /* Free the extra copy of the object prefix */
1748 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1749 kfree(h.object_prefix);
1751 ret = rbd_dev_snaps_update(rbd_dev);
1753 ret = rbd_dev_snaps_register(rbd_dev);
1755 up_write(&rbd_dev->header_rwsem);
1760 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1764 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1765 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1766 if (rbd_dev->image_format == 1)
1767 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1769 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1770 mutex_unlock(&ctl_mutex);
1775 static int rbd_init_disk(struct rbd_device *rbd_dev)
1777 struct gendisk *disk;
1778 struct request_queue *q;
1781 /* create gendisk info */
1782 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1786 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1788 disk->major = rbd_dev->major;
1789 disk->first_minor = 0;
1790 disk->fops = &rbd_bd_ops;
1791 disk->private_data = rbd_dev;
1794 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1798 /* We use the default size, but let's be explicit about it. */
1799 blk_queue_physical_block_size(q, SECTOR_SIZE);
1801 /* set io sizes to object size */
1802 segment_size = rbd_obj_bytes(&rbd_dev->header);
1803 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1804 blk_queue_max_segment_size(q, segment_size);
1805 blk_queue_io_min(q, segment_size);
1806 blk_queue_io_opt(q, segment_size);
1808 blk_queue_merge_bvec(q, rbd_merge_bvec);
1811 q->queuedata = rbd_dev;
1813 rbd_dev->disk = disk;
1815 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1828 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1830 return container_of(dev, struct rbd_device, dev);
1833 static ssize_t rbd_size_show(struct device *dev,
1834 struct device_attribute *attr, char *buf)
1836 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1839 down_read(&rbd_dev->header_rwsem);
1840 size = get_capacity(rbd_dev->disk);
1841 up_read(&rbd_dev->header_rwsem);
1843 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1847 * Note this shows the features for whatever's mapped, which is not
1848 * necessarily the base image.
1850 static ssize_t rbd_features_show(struct device *dev,
1851 struct device_attribute *attr, char *buf)
1853 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1855 return sprintf(buf, "0x%016llx\n",
1856 (unsigned long long) rbd_dev->mapping.features);
1859 static ssize_t rbd_major_show(struct device *dev,
1860 struct device_attribute *attr, char *buf)
1862 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1864 return sprintf(buf, "%d\n", rbd_dev->major);
1867 static ssize_t rbd_client_id_show(struct device *dev,
1868 struct device_attribute *attr, char *buf)
1870 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1872 return sprintf(buf, "client%lld\n",
1873 ceph_client_id(rbd_dev->rbd_client->client));
1876 static ssize_t rbd_pool_show(struct device *dev,
1877 struct device_attribute *attr, char *buf)
1879 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1881 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1884 static ssize_t rbd_pool_id_show(struct device *dev,
1885 struct device_attribute *attr, char *buf)
1887 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1889 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1892 static ssize_t rbd_name_show(struct device *dev,
1893 struct device_attribute *attr, char *buf)
1895 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1897 return sprintf(buf, "%s\n", rbd_dev->image_name);
1900 static ssize_t rbd_image_id_show(struct device *dev,
1901 struct device_attribute *attr, char *buf)
1903 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1905 return sprintf(buf, "%s\n", rbd_dev->image_id);
1909 * Shows the name of the currently-mapped snapshot (or
1910 * RBD_SNAP_HEAD_NAME for the base image).
1912 static ssize_t rbd_snap_show(struct device *dev,
1913 struct device_attribute *attr,
1916 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1918 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1921 static ssize_t rbd_image_refresh(struct device *dev,
1922 struct device_attribute *attr,
1926 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1929 ret = rbd_dev_refresh(rbd_dev, NULL);
1931 return ret < 0 ? ret : size;
1934 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1935 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1936 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1937 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1938 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1939 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1940 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1941 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1942 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1943 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1945 static struct attribute *rbd_attrs[] = {
1946 &dev_attr_size.attr,
1947 &dev_attr_features.attr,
1948 &dev_attr_major.attr,
1949 &dev_attr_client_id.attr,
1950 &dev_attr_pool.attr,
1951 &dev_attr_pool_id.attr,
1952 &dev_attr_name.attr,
1953 &dev_attr_image_id.attr,
1954 &dev_attr_current_snap.attr,
1955 &dev_attr_refresh.attr,
1959 static struct attribute_group rbd_attr_group = {
1963 static const struct attribute_group *rbd_attr_groups[] = {
1968 static void rbd_sysfs_dev_release(struct device *dev)
1972 static struct device_type rbd_device_type = {
1974 .groups = rbd_attr_groups,
1975 .release = rbd_sysfs_dev_release,
1983 static ssize_t rbd_snap_size_show(struct device *dev,
1984 struct device_attribute *attr,
1987 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1989 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1992 static ssize_t rbd_snap_id_show(struct device *dev,
1993 struct device_attribute *attr,
1996 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1998 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2001 static ssize_t rbd_snap_features_show(struct device *dev,
2002 struct device_attribute *attr,
2005 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2007 return sprintf(buf, "0x%016llx\n",
2008 (unsigned long long) snap->features);
2011 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2012 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2013 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2015 static struct attribute *rbd_snap_attrs[] = {
2016 &dev_attr_snap_size.attr,
2017 &dev_attr_snap_id.attr,
2018 &dev_attr_snap_features.attr,
2022 static struct attribute_group rbd_snap_attr_group = {
2023 .attrs = rbd_snap_attrs,
2026 static void rbd_snap_dev_release(struct device *dev)
2028 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2033 static const struct attribute_group *rbd_snap_attr_groups[] = {
2034 &rbd_snap_attr_group,
2038 static struct device_type rbd_snap_device_type = {
2039 .groups = rbd_snap_attr_groups,
2040 .release = rbd_snap_dev_release,
2043 static bool rbd_snap_registered(struct rbd_snap *snap)
2045 bool ret = snap->dev.type == &rbd_snap_device_type;
2046 bool reg = device_is_registered(&snap->dev);
2048 rbd_assert(!ret ^ reg);
2053 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2055 list_del(&snap->node);
2056 if (device_is_registered(&snap->dev))
2057 device_unregister(&snap->dev);
2060 static int rbd_register_snap_dev(struct rbd_snap *snap,
2061 struct device *parent)
2063 struct device *dev = &snap->dev;
2066 dev->type = &rbd_snap_device_type;
2067 dev->parent = parent;
2068 dev->release = rbd_snap_dev_release;
2069 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2070 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2072 ret = device_register(dev);
2077 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2078 const char *snap_name,
2079 u64 snap_id, u64 snap_size,
2082 struct rbd_snap *snap;
2085 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2087 return ERR_PTR(-ENOMEM);
2090 snap->name = kstrdup(snap_name, GFP_KERNEL);
2095 snap->size = snap_size;
2096 snap->features = snap_features;
2104 return ERR_PTR(ret);
2107 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2108 u64 *snap_size, u64 *snap_features)
2112 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2114 *snap_size = rbd_dev->header.snap_sizes[which];
2115 *snap_features = 0; /* No features for v1 */
2117 /* Skip over names until we find the one we are looking for */
2119 snap_name = rbd_dev->header.snap_names;
2121 snap_name += strlen(snap_name) + 1;
2127 * Get the size and object order for an image snapshot, or if
2128 * snap_id is CEPH_NOSNAP, gets this information for the base
2131 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2132 u8 *order, u64 *snap_size)
2134 __le64 snapid = cpu_to_le64(snap_id);
2139 } __attribute__ ((packed)) size_buf = { 0 };
2141 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2143 (char *) &snapid, sizeof (snapid),
2144 (char *) &size_buf, sizeof (size_buf),
2145 CEPH_OSD_FLAG_READ, NULL);
2146 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2150 *order = size_buf.order;
2151 *snap_size = le64_to_cpu(size_buf.size);
2153 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2154 (unsigned long long) snap_id, (unsigned int) *order,
2155 (unsigned long long) *snap_size);
2160 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2162 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2163 &rbd_dev->header.obj_order,
2164 &rbd_dev->header.image_size);
2167 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2173 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2177 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2178 "rbd", "get_object_prefix",
2180 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2181 CEPH_OSD_FLAG_READ, NULL);
2182 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2185 ret = 0; /* rbd_req_sync_exec() can return positive */
2188 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2189 p + RBD_OBJ_PREFIX_LEN_MAX,
2192 if (IS_ERR(rbd_dev->header.object_prefix)) {
2193 ret = PTR_ERR(rbd_dev->header.object_prefix);
2194 rbd_dev->header.object_prefix = NULL;
2196 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2205 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2208 __le64 snapid = cpu_to_le64(snap_id);
2212 } features_buf = { 0 };
2216 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2217 "rbd", "get_features",
2218 (char *) &snapid, sizeof (snapid),
2219 (char *) &features_buf, sizeof (features_buf),
2220 CEPH_OSD_FLAG_READ, NULL);
2221 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2225 incompat = le64_to_cpu(features_buf.incompat);
2226 if (incompat & ~RBD_FEATURES_ALL)
2229 *snap_features = le64_to_cpu(features_buf.features);
2231 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2232 (unsigned long long) snap_id,
2233 (unsigned long long) *snap_features,
2234 (unsigned long long) le64_to_cpu(features_buf.incompat));
2239 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2241 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2242 &rbd_dev->header.features);
2245 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2254 struct ceph_snap_context *snapc;
2258 * We'll need room for the seq value (maximum snapshot id),
2259 * snapshot count, and array of that many snapshot ids.
2260 * For now we have a fixed upper limit on the number we're
2261 * prepared to receive.
2263 size = sizeof (__le64) + sizeof (__le32) +
2264 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2265 reply_buf = kzalloc(size, GFP_KERNEL);
2269 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2270 "rbd", "get_snapcontext",
2273 CEPH_OSD_FLAG_READ, ver);
2274 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2280 end = (char *) reply_buf + size;
2281 ceph_decode_64_safe(&p, end, seq, out);
2282 ceph_decode_32_safe(&p, end, snap_count, out);
2285 * Make sure the reported number of snapshot ids wouldn't go
2286 * beyond the end of our buffer. But before checking that,
2287 * make sure the computed size of the snapshot context we
2288 * allocate is representable in a size_t.
2290 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2295 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2298 size = sizeof (struct ceph_snap_context) +
2299 snap_count * sizeof (snapc->snaps[0]);
2300 snapc = kmalloc(size, GFP_KERNEL);
2306 atomic_set(&snapc->nref, 1);
2308 snapc->num_snaps = snap_count;
2309 for (i = 0; i < snap_count; i++)
2310 snapc->snaps[i] = ceph_decode_64(&p);
2312 rbd_dev->header.snapc = snapc;
2314 dout(" snap context seq = %llu, snap_count = %u\n",
2315 (unsigned long long) seq, (unsigned int) snap_count);
2323 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2331 size_t snap_name_len;
2334 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2335 reply_buf = kmalloc(size, GFP_KERNEL);
2337 return ERR_PTR(-ENOMEM);
2339 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2340 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2341 "rbd", "get_snapshot_name",
2342 (char *) &snap_id, sizeof (snap_id),
2344 CEPH_OSD_FLAG_READ, NULL);
2345 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2350 end = (char *) reply_buf + size;
2352 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2354 if (IS_ERR(snap_name)) {
2355 ret = PTR_ERR(snap_name);
2358 dout(" snap_id 0x%016llx snap_name = %s\n",
2359 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2367 return ERR_PTR(ret);
2370 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2371 u64 *snap_size, u64 *snap_features)
2377 snap_id = rbd_dev->header.snapc->snaps[which];
2378 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2380 return ERR_PTR(ret);
2381 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2383 return ERR_PTR(ret);
2385 return rbd_dev_v2_snap_name(rbd_dev, which);
2388 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2389 u64 *snap_size, u64 *snap_features)
2391 if (rbd_dev->image_format == 1)
2392 return rbd_dev_v1_snap_info(rbd_dev, which,
2393 snap_size, snap_features);
2394 if (rbd_dev->image_format == 2)
2395 return rbd_dev_v2_snap_info(rbd_dev, which,
2396 snap_size, snap_features);
2397 return ERR_PTR(-EINVAL);
2400 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2405 down_write(&rbd_dev->header_rwsem);
2407 /* Grab old order first, to see if it changes */
2409 obj_order = rbd_dev->header.obj_order,
2410 ret = rbd_dev_v2_image_size(rbd_dev);
2413 if (rbd_dev->header.obj_order != obj_order) {
2417 rbd_update_mapping_size(rbd_dev);
2419 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2420 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2423 ret = rbd_dev_snaps_update(rbd_dev);
2424 dout("rbd_dev_snaps_update returned %d\n", ret);
2427 ret = rbd_dev_snaps_register(rbd_dev);
2428 dout("rbd_dev_snaps_register returned %d\n", ret);
2430 up_write(&rbd_dev->header_rwsem);
2436 * Scan the rbd device's current snapshot list and compare it to the
2437 * newly-received snapshot context. Remove any existing snapshots
2438 * not present in the new snapshot context. Add a new snapshot for
2439 * any snaphots in the snapshot context not in the current list.
2440 * And verify there are no changes to snapshots we already know
2443 * Assumes the snapshots in the snapshot context are sorted by
2444 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2445 * are also maintained in that order.)
2447 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2449 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2450 const u32 snap_count = snapc->num_snaps;
2451 struct list_head *head = &rbd_dev->snaps;
2452 struct list_head *links = head->next;
2455 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2456 while (index < snap_count || links != head) {
2458 struct rbd_snap *snap;
2461 u64 snap_features = 0;
2463 snap_id = index < snap_count ? snapc->snaps[index]
2465 snap = links != head ? list_entry(links, struct rbd_snap, node)
2467 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2469 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2470 struct list_head *next = links->next;
2472 /* Existing snapshot not in the new snap context */
2474 if (rbd_dev->mapping.snap_id == snap->id)
2475 rbd_dev->mapping.snap_exists = false;
2476 __rbd_remove_snap_dev(snap);
2477 dout("%ssnap id %llu has been removed\n",
2478 rbd_dev->mapping.snap_id == snap->id ?
2480 (unsigned long long) snap->id);
2482 /* Done with this list entry; advance */
2488 snap_name = rbd_dev_snap_info(rbd_dev, index,
2489 &snap_size, &snap_features);
2490 if (IS_ERR(snap_name))
2491 return PTR_ERR(snap_name);
2493 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2494 (unsigned long long) snap_id);
2495 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2496 struct rbd_snap *new_snap;
2498 /* We haven't seen this snapshot before */
2500 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2501 snap_id, snap_size, snap_features);
2502 if (IS_ERR(new_snap)) {
2503 int err = PTR_ERR(new_snap);
2505 dout(" failed to add dev, error %d\n", err);
2510 /* New goes before existing, or at end of list */
2512 dout(" added dev%s\n", snap ? "" : " at end\n");
2514 list_add_tail(&new_snap->node, &snap->node);
2516 list_add_tail(&new_snap->node, head);
2518 /* Already have this one */
2520 dout(" already present\n");
2522 rbd_assert(snap->size == snap_size);
2523 rbd_assert(!strcmp(snap->name, snap_name));
2524 rbd_assert(snap->features == snap_features);
2526 /* Done with this list entry; advance */
2528 links = links->next;
2531 /* Advance to the next entry in the snapshot context */
2535 dout("%s: done\n", __func__);
2541 * Scan the list of snapshots and register the devices for any that
2542 * have not already been registered.
2544 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2546 struct rbd_snap *snap;
2549 dout("%s called\n", __func__);
2550 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2553 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2554 if (!rbd_snap_registered(snap)) {
2555 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2560 dout("%s: returning %d\n", __func__, ret);
2565 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2570 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2572 dev = &rbd_dev->dev;
2573 dev->bus = &rbd_bus_type;
2574 dev->type = &rbd_device_type;
2575 dev->parent = &rbd_root_dev;
2576 dev->release = rbd_dev_release;
2577 dev_set_name(dev, "%d", rbd_dev->dev_id);
2578 ret = device_register(dev);
2580 mutex_unlock(&ctl_mutex);
2585 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2587 device_unregister(&rbd_dev->dev);
2590 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2595 ret = rbd_req_sync_watch(rbd_dev);
2596 if (ret == -ERANGE) {
2597 rc = rbd_dev_refresh(rbd_dev, NULL);
2601 } while (ret == -ERANGE);
2606 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2609 * Get a unique rbd identifier for the given new rbd_dev, and add
2610 * the rbd_dev to the global list. The minimum rbd id is 1.
2612 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2614 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2616 spin_lock(&rbd_dev_list_lock);
2617 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2618 spin_unlock(&rbd_dev_list_lock);
2619 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2620 (unsigned long long) rbd_dev->dev_id);
2624 * Remove an rbd_dev from the global list, and record that its
2625 * identifier is no longer in use.
2627 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2629 struct list_head *tmp;
2630 int rbd_id = rbd_dev->dev_id;
2633 rbd_assert(rbd_id > 0);
2635 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2636 (unsigned long long) rbd_dev->dev_id);
2637 spin_lock(&rbd_dev_list_lock);
2638 list_del_init(&rbd_dev->node);
2641 * If the id being "put" is not the current maximum, there
2642 * is nothing special we need to do.
2644 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2645 spin_unlock(&rbd_dev_list_lock);
2650 * We need to update the current maximum id. Search the
2651 * list to find out what it is. We're more likely to find
2652 * the maximum at the end, so search the list backward.
2655 list_for_each_prev(tmp, &rbd_dev_list) {
2656 struct rbd_device *rbd_dev;
2658 rbd_dev = list_entry(tmp, struct rbd_device, node);
2659 if (rbd_dev->dev_id > max_id)
2660 max_id = rbd_dev->dev_id;
2662 spin_unlock(&rbd_dev_list_lock);
2665 * The max id could have been updated by rbd_dev_id_get(), in
2666 * which case it now accurately reflects the new maximum.
2667 * Be careful not to overwrite the maximum value in that
2670 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2671 dout(" max dev id has been reset\n");
2675 * Skips over white space at *buf, and updates *buf to point to the
2676 * first found non-space character (if any). Returns the length of
2677 * the token (string of non-white space characters) found. Note
2678 * that *buf must be terminated with '\0'.
2680 static inline size_t next_token(const char **buf)
2683 * These are the characters that produce nonzero for
2684 * isspace() in the "C" and "POSIX" locales.
2686 const char *spaces = " \f\n\r\t\v";
2688 *buf += strspn(*buf, spaces); /* Find start of token */
2690 return strcspn(*buf, spaces); /* Return token length */
2694 * Finds the next token in *buf, and if the provided token buffer is
2695 * big enough, copies the found token into it. The result, if
2696 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2697 * must be terminated with '\0' on entry.
2699 * Returns the length of the token found (not including the '\0').
2700 * Return value will be 0 if no token is found, and it will be >=
2701 * token_size if the token would not fit.
2703 * The *buf pointer will be updated to point beyond the end of the
2704 * found token. Note that this occurs even if the token buffer is
2705 * too small to hold it.
2707 static inline size_t copy_token(const char **buf,
2713 len = next_token(buf);
2714 if (len < token_size) {
2715 memcpy(token, *buf, len);
2716 *(token + len) = '\0';
2724 * Finds the next token in *buf, dynamically allocates a buffer big
2725 * enough to hold a copy of it, and copies the token into the new
2726 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2727 * that a duplicate buffer is created even for a zero-length token.
2729 * Returns a pointer to the newly-allocated duplicate, or a null
2730 * pointer if memory for the duplicate was not available. If
2731 * the lenp argument is a non-null pointer, the length of the token
2732 * (not including the '\0') is returned in *lenp.
2734 * If successful, the *buf pointer will be updated to point beyond
2735 * the end of the found token.
2737 * Note: uses GFP_KERNEL for allocation.
2739 static inline char *dup_token(const char **buf, size_t *lenp)
2744 len = next_token(buf);
2745 dup = kmalloc(len + 1, GFP_KERNEL);
2749 memcpy(dup, *buf, len);
2750 *(dup + len) = '\0';
2760 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2761 * rbd_md_name, and name fields of the given rbd_dev, based on the
2762 * list of monitor addresses and other options provided via
2763 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2764 * copy of the snapshot name to map if successful, or a
2765 * pointer-coded error otherwise.
2767 * Note: rbd_dev is assumed to have been initially zero-filled.
2769 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2771 const char **mon_addrs,
2772 size_t *mon_addrs_size,
2774 size_t options_size)
2777 char *err_ptr = ERR_PTR(-EINVAL);
2780 /* The first four tokens are required */
2782 len = next_token(&buf);
2785 *mon_addrs_size = len + 1;
2790 len = copy_token(&buf, options, options_size);
2791 if (!len || len >= options_size)
2794 err_ptr = ERR_PTR(-ENOMEM);
2795 rbd_dev->pool_name = dup_token(&buf, NULL);
2796 if (!rbd_dev->pool_name)
2799 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2800 if (!rbd_dev->image_name)
2803 /* Snapshot name is optional; default is to use "head" */
2805 len = next_token(&buf);
2806 if (len > RBD_MAX_SNAP_NAME_LEN) {
2807 err_ptr = ERR_PTR(-ENAMETOOLONG);
2811 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2812 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2814 snap_name = kmalloc(len + 1, GFP_KERNEL);
2817 memcpy(snap_name, buf, len);
2818 *(snap_name + len) = '\0';
2823 kfree(rbd_dev->image_name);
2824 rbd_dev->image_name = NULL;
2825 rbd_dev->image_name_len = 0;
2826 kfree(rbd_dev->pool_name);
2827 rbd_dev->pool_name = NULL;
2833 * An rbd format 2 image has a unique identifier, distinct from the
2834 * name given to it by the user. Internally, that identifier is
2835 * what's used to specify the names of objects related to the image.
2837 * A special "rbd id" object is used to map an rbd image name to its
2838 * id. If that object doesn't exist, then there is no v2 rbd image
2839 * with the supplied name.
2841 * This function will record the given rbd_dev's image_id field if
2842 * it can be determined, and in that case will return 0. If any
2843 * errors occur a negative errno will be returned and the rbd_dev's
2844 * image_id field will be unchanged (and should be NULL).
2846 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2855 * First, see if the format 2 image id file exists, and if
2856 * so, get the image's persistent id from it.
2858 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2859 object_name = kmalloc(size, GFP_NOIO);
2862 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2863 dout("rbd id object name is %s\n", object_name);
2865 /* Response will be an encoded string, which includes a length */
2867 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2868 response = kzalloc(size, GFP_NOIO);
2874 ret = rbd_req_sync_exec(rbd_dev, object_name,
2877 response, RBD_IMAGE_ID_LEN_MAX,
2878 CEPH_OSD_FLAG_READ, NULL);
2879 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2882 ret = 0; /* rbd_req_sync_exec() can return positive */
2885 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2886 p + RBD_IMAGE_ID_LEN_MAX,
2887 &rbd_dev->image_id_len,
2889 if (IS_ERR(rbd_dev->image_id)) {
2890 ret = PTR_ERR(rbd_dev->image_id);
2891 rbd_dev->image_id = NULL;
2893 dout("image_id is %s\n", rbd_dev->image_id);
2902 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2907 /* Version 1 images have no id; empty string is used */
2909 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2910 if (!rbd_dev->image_id)
2912 rbd_dev->image_id_len = 0;
2914 /* Record the header object name for this rbd image. */
2916 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2917 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2918 if (!rbd_dev->header_name) {
2922 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2924 /* Populate rbd image metadata */
2926 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2929 rbd_dev->image_format = 1;
2931 dout("discovered version 1 image, header name is %s\n",
2932 rbd_dev->header_name);
2937 kfree(rbd_dev->header_name);
2938 rbd_dev->header_name = NULL;
2939 kfree(rbd_dev->image_id);
2940 rbd_dev->image_id = NULL;
2945 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2952 * Image id was filled in by the caller. Record the header
2953 * object name for this rbd image.
2955 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2956 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2957 if (!rbd_dev->header_name)
2959 sprintf(rbd_dev->header_name, "%s%s",
2960 RBD_HEADER_PREFIX, rbd_dev->image_id);
2962 /* Get the size and object order for the image */
2964 ret = rbd_dev_v2_image_size(rbd_dev);
2968 /* Get the object prefix (a.k.a. block_name) for the image */
2970 ret = rbd_dev_v2_object_prefix(rbd_dev);
2974 /* Get the and check features for the image */
2976 ret = rbd_dev_v2_features(rbd_dev);
2980 /* crypto and compression type aren't (yet) supported for v2 images */
2982 rbd_dev->header.crypt_type = 0;
2983 rbd_dev->header.comp_type = 0;
2985 /* Get the snapshot context, plus the header version */
2987 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2990 rbd_dev->header.obj_version = ver;
2992 rbd_dev->image_format = 2;
2994 dout("discovered version 2 image, header name is %s\n",
2995 rbd_dev->header_name);
2999 kfree(rbd_dev->header_name);
3000 rbd_dev->header_name = NULL;
3001 kfree(rbd_dev->header.object_prefix);
3002 rbd_dev->header.object_prefix = NULL;
3008 * Probe for the existence of the header object for the given rbd
3009 * device. For format 2 images this includes determining the image
3012 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3017 * Get the id from the image id object. If it's not a
3018 * format 2 image, we'll get ENOENT back, and we'll assume
3019 * it's a format 1 image.
3021 ret = rbd_dev_image_id(rbd_dev);
3023 ret = rbd_dev_v1_probe(rbd_dev);
3025 ret = rbd_dev_v2_probe(rbd_dev);
3027 dout("probe failed, returning %d\n", ret);
3032 static ssize_t rbd_add(struct bus_type *bus,
3037 struct rbd_device *rbd_dev = NULL;
3038 const char *mon_addrs = NULL;
3039 size_t mon_addrs_size = 0;
3040 struct ceph_osd_client *osdc;
3044 if (!try_module_get(THIS_MODULE))
3047 options = kmalloc(count, GFP_KERNEL);
3050 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3054 /* static rbd_device initialization */
3055 spin_lock_init(&rbd_dev->lock);
3056 INIT_LIST_HEAD(&rbd_dev->node);
3057 INIT_LIST_HEAD(&rbd_dev->snaps);
3058 init_rwsem(&rbd_dev->header_rwsem);
3060 /* parse add command */
3061 snap_name = rbd_add_parse_args(rbd_dev, buf,
3062 &mon_addrs, &mon_addrs_size, options, count);
3063 if (IS_ERR(snap_name)) {
3064 rc = PTR_ERR(snap_name);
3068 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3073 osdc = &rbd_dev->rbd_client->client->osdc;
3074 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3076 goto err_out_client;
3077 rbd_dev->pool_id = rc;
3079 rc = rbd_dev_probe(rbd_dev);
3081 goto err_out_client;
3083 /* no need to lock here, as rbd_dev is not registered yet */
3084 rc = rbd_dev_snaps_update(rbd_dev);
3086 goto err_out_header;
3088 rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3090 goto err_out_header;
3092 /* generate unique id: find highest unique id, add one */
3093 rbd_dev_id_get(rbd_dev);
3095 /* Fill in the device name, now that we have its id. */
3096 BUILD_BUG_ON(DEV_NAME_LEN
3097 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3098 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3100 /* Get our block major device number. */
3102 rc = register_blkdev(0, rbd_dev->name);
3105 rbd_dev->major = rc;
3107 /* Set up the blkdev mapping. */
3109 rc = rbd_init_disk(rbd_dev);
3111 goto err_out_blkdev;
3113 rc = rbd_bus_add_dev(rbd_dev);
3118 * At this point cleanup in the event of an error is the job
3119 * of the sysfs code (initiated by rbd_bus_del_dev()).
3122 down_write(&rbd_dev->header_rwsem);
3123 rc = rbd_dev_snaps_register(rbd_dev);
3124 up_write(&rbd_dev->header_rwsem);
3128 rc = rbd_init_watch_dev(rbd_dev);
3132 /* Everything's ready. Announce the disk to the world. */
3134 add_disk(rbd_dev->disk);
3136 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3137 (unsigned long long) rbd_dev->mapping.size);
3142 /* this will also clean up rest of rbd_dev stuff */
3144 rbd_bus_del_dev(rbd_dev);
3149 rbd_free_disk(rbd_dev);
3151 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3153 rbd_dev_id_put(rbd_dev);
3155 rbd_header_free(&rbd_dev->header);
3157 kfree(rbd_dev->header_name);
3158 rbd_put_client(rbd_dev);
3159 kfree(rbd_dev->image_id);
3161 kfree(rbd_dev->mapping.snap_name);
3162 kfree(rbd_dev->image_name);
3163 kfree(rbd_dev->pool_name);
3168 dout("Error adding device %s\n", buf);
3169 module_put(THIS_MODULE);
3171 return (ssize_t) rc;
3174 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3176 struct list_head *tmp;
3177 struct rbd_device *rbd_dev;
3179 spin_lock(&rbd_dev_list_lock);
3180 list_for_each(tmp, &rbd_dev_list) {
3181 rbd_dev = list_entry(tmp, struct rbd_device, node);
3182 if (rbd_dev->dev_id == dev_id) {
3183 spin_unlock(&rbd_dev_list_lock);
3187 spin_unlock(&rbd_dev_list_lock);
3191 static void rbd_dev_release(struct device *dev)
3193 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3195 if (rbd_dev->watch_request) {
3196 struct ceph_client *client = rbd_dev->rbd_client->client;
3198 ceph_osdc_unregister_linger_request(&client->osdc,
3199 rbd_dev->watch_request);
3201 if (rbd_dev->watch_event)
3202 rbd_req_sync_unwatch(rbd_dev);
3204 rbd_put_client(rbd_dev);
3206 /* clean up and free blkdev */
3207 rbd_free_disk(rbd_dev);
3208 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3210 /* release allocated disk header fields */
3211 rbd_header_free(&rbd_dev->header);
3213 /* done with the id, and with the rbd_dev */
3214 kfree(rbd_dev->mapping.snap_name);
3215 kfree(rbd_dev->image_id);
3216 kfree(rbd_dev->header_name);
3217 kfree(rbd_dev->pool_name);
3218 kfree(rbd_dev->image_name);
3219 rbd_dev_id_put(rbd_dev);
3222 /* release module ref */
3223 module_put(THIS_MODULE);
3226 static ssize_t rbd_remove(struct bus_type *bus,
3230 struct rbd_device *rbd_dev = NULL;
3235 rc = strict_strtoul(buf, 10, &ul);
3239 /* convert to int; abort if we lost anything in the conversion */
3240 target_id = (int) ul;
3241 if (target_id != ul)
3244 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3246 rbd_dev = __rbd_get_dev(target_id);
3252 __rbd_remove_all_snaps(rbd_dev);
3253 rbd_bus_del_dev(rbd_dev);
3256 mutex_unlock(&ctl_mutex);
3262 * create control files in sysfs
3265 static int rbd_sysfs_init(void)
3269 ret = device_register(&rbd_root_dev);
3273 ret = bus_register(&rbd_bus_type);
3275 device_unregister(&rbd_root_dev);
3280 static void rbd_sysfs_cleanup(void)
3282 bus_unregister(&rbd_bus_type);
3283 device_unregister(&rbd_root_dev);
3286 int __init rbd_init(void)
3290 rc = rbd_sysfs_init();
3293 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3297 void __exit rbd_exit(void)
3299 rbd_sysfs_cleanup();
3302 module_init(rbd_init);
3303 module_exit(rbd_exit);
3305 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3306 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3307 MODULE_DESCRIPTION("rados block device");
3309 /* following authorship retained from original osdblk.c */
3310 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3312 MODULE_LICENSE("GPL");