2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN 1024
68 #define RBD_SNAP_HEAD_NAME "-"
70 #define RBD_IMAGE_ID_LEN_MAX 64
71 #define RBD_OBJ_PREFIX_LEN_MAX 64
74 * An RBD device name will be "rbd#", where the "rbd" comes from
75 * RBD_DRV_NAME above, and # is a unique integer identifier.
76 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
77 * enough to hold all possible device names.
79 #define DEV_NAME_LEN 32
80 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
82 #define RBD_READ_ONLY_DEFAULT false
85 * block device image metadata (in-memory version)
87 struct rbd_image_header {
88 /* These four fields never change for a given rbd image */
95 /* The remaining fields need to be updated occasionally */
97 struct ceph_snap_context *snapc;
109 * an instance of the client. multiple devices may share an rbd client.
112 struct ceph_client *client;
114 struct list_head node;
118 * a request completion status
120 struct rbd_req_status {
127 * a collection of requests
129 struct rbd_req_coll {
133 struct rbd_req_status status[0];
137 * a single io request
140 struct request *rq; /* blk layer request */
141 struct bio *bio; /* cloned bio */
142 struct page **pages; /* list of used pages */
145 struct rbd_req_coll *coll;
152 struct list_head node;
170 int dev_id; /* blkdev unique id */
172 int major; /* blkdev assigned major */
173 struct gendisk *disk; /* blkdev's gendisk and rq */
175 u32 image_format; /* Either 1 or 2 */
176 struct rbd_options rbd_opts;
177 struct rbd_client *rbd_client;
179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
181 spinlock_t lock; /* queue lock */
183 struct rbd_image_header header;
187 size_t image_name_len;
192 struct ceph_osd_event *watch_event;
193 struct ceph_osd_request *watch_request;
195 /* protects updating the header */
196 struct rw_semaphore header_rwsem;
198 struct rbd_mapping mapping;
200 struct list_head node;
202 /* list of snapshots */
203 struct list_head snaps;
209 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
211 static LIST_HEAD(rbd_dev_list); /* devices */
212 static DEFINE_SPINLOCK(rbd_dev_list_lock);
214 static LIST_HEAD(rbd_client_list); /* clients */
215 static DEFINE_SPINLOCK(rbd_client_list_lock);
217 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
220 static void rbd_dev_release(struct device *dev);
221 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
223 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
225 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
228 static struct bus_attribute rbd_bus_attrs[] = {
229 __ATTR(add, S_IWUSR, NULL, rbd_add),
230 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
234 static struct bus_type rbd_bus_type = {
236 .bus_attrs = rbd_bus_attrs,
239 static void rbd_root_dev_release(struct device *dev)
243 static struct device rbd_root_dev = {
245 .release = rbd_root_dev_release,
249 #define rbd_assert(expr) \
250 if (unlikely(!(expr))) { \
251 printk(KERN_ERR "\nAssertion failure in %s() " \
253 "\trbd_assert(%s);\n\n", \
254 __func__, __LINE__, #expr); \
257 #else /* !RBD_DEBUG */
258 # define rbd_assert(expr) ((void) 0)
259 #endif /* !RBD_DEBUG */
261 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
263 return get_device(&rbd_dev->dev);
266 static void rbd_put_dev(struct rbd_device *rbd_dev)
268 put_device(&rbd_dev->dev);
271 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
273 static int rbd_open(struct block_device *bdev, fmode_t mode)
275 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
277 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
280 rbd_get_dev(rbd_dev);
281 set_device_ro(bdev, rbd_dev->mapping.read_only);
286 static int rbd_release(struct gendisk *disk, fmode_t mode)
288 struct rbd_device *rbd_dev = disk->private_data;
290 rbd_put_dev(rbd_dev);
295 static const struct block_device_operations rbd_bd_ops = {
296 .owner = THIS_MODULE,
298 .release = rbd_release,
302 * Initialize an rbd client instance.
305 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
307 struct rbd_client *rbdc;
310 dout("rbd_client_create\n");
311 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
315 kref_init(&rbdc->kref);
316 INIT_LIST_HEAD(&rbdc->node);
318 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
320 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
321 if (IS_ERR(rbdc->client))
323 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
325 ret = ceph_open_session(rbdc->client);
329 spin_lock(&rbd_client_list_lock);
330 list_add_tail(&rbdc->node, &rbd_client_list);
331 spin_unlock(&rbd_client_list_lock);
333 mutex_unlock(&ctl_mutex);
335 dout("rbd_client_create created %p\n", rbdc);
339 ceph_destroy_client(rbdc->client);
341 mutex_unlock(&ctl_mutex);
345 ceph_destroy_options(ceph_opts);
350 * Find a ceph client with specific addr and configuration. If
351 * found, bump its reference count.
353 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
355 struct rbd_client *client_node;
358 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
361 spin_lock(&rbd_client_list_lock);
362 list_for_each_entry(client_node, &rbd_client_list, node) {
363 if (!ceph_compare_options(ceph_opts, client_node->client)) {
364 kref_get(&client_node->kref);
369 spin_unlock(&rbd_client_list_lock);
371 return found ? client_node : NULL;
381 /* string args above */
384 /* Boolean args above */
388 static match_table_t rbd_opts_tokens = {
390 /* string args above */
391 {Opt_read_only, "mapping.read_only"},
392 {Opt_read_only, "ro"}, /* Alternate spelling */
393 {Opt_read_write, "read_write"},
394 {Opt_read_write, "rw"}, /* Alternate spelling */
395 /* Boolean args above */
399 static int parse_rbd_opts_token(char *c, void *private)
401 struct rbd_options *rbd_opts = private;
402 substring_t argstr[MAX_OPT_ARGS];
403 int token, intval, ret;
405 token = match_token(c, rbd_opts_tokens, argstr);
409 if (token < Opt_last_int) {
410 ret = match_int(&argstr[0], &intval);
412 pr_err("bad mount option arg (not int) "
416 dout("got int token %d val %d\n", token, intval);
417 } else if (token > Opt_last_int && token < Opt_last_string) {
418 dout("got string token %d val %s\n", token,
420 } else if (token > Opt_last_string && token < Opt_last_bool) {
421 dout("got Boolean token %d\n", token);
423 dout("got token %d\n", token);
428 rbd_opts->read_only = true;
431 rbd_opts->read_only = false;
441 * Get a ceph client with specific addr and configuration, if one does
442 * not exist create it.
444 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
445 size_t mon_addr_len, char *options)
447 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
448 struct ceph_options *ceph_opts;
449 struct rbd_client *rbdc;
451 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
453 ceph_opts = ceph_parse_options(options, mon_addr,
454 mon_addr + mon_addr_len,
455 parse_rbd_opts_token, rbd_opts);
456 if (IS_ERR(ceph_opts))
457 return PTR_ERR(ceph_opts);
459 rbdc = rbd_client_find(ceph_opts);
461 /* using an existing client */
462 ceph_destroy_options(ceph_opts);
464 rbdc = rbd_client_create(ceph_opts);
466 return PTR_ERR(rbdc);
468 rbd_dev->rbd_client = rbdc;
474 * Destroy ceph client
476 * Caller must hold rbd_client_list_lock.
478 static void rbd_client_release(struct kref *kref)
480 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
482 dout("rbd_release_client %p\n", rbdc);
483 spin_lock(&rbd_client_list_lock);
484 list_del(&rbdc->node);
485 spin_unlock(&rbd_client_list_lock);
487 ceph_destroy_client(rbdc->client);
492 * Drop reference to ceph client node. If it's not referenced anymore, release
495 static void rbd_put_client(struct rbd_device *rbd_dev)
497 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
498 rbd_dev->rbd_client = NULL;
502 * Destroy requests collection
504 static void rbd_coll_release(struct kref *kref)
506 struct rbd_req_coll *coll =
507 container_of(kref, struct rbd_req_coll, kref);
509 dout("rbd_coll_release %p\n", coll);
513 static bool rbd_image_format_valid(u32 image_format)
515 return image_format == 1 || image_format == 2;
518 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
523 /* The header has to start with the magic rbd header text */
524 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
528 * The size of a snapshot header has to fit in a size_t, and
529 * that limits the number of snapshots.
531 snap_count = le32_to_cpu(ondisk->snap_count);
532 size = SIZE_MAX - sizeof (struct ceph_snap_context);
533 if (snap_count > size / sizeof (__le64))
537 * Not only that, but the size of the entire the snapshot
538 * header must also be representable in a size_t.
540 size -= snap_count * sizeof (__le64);
541 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
548 * Create a new header structure, translate header format from the on-disk
551 static int rbd_header_from_disk(struct rbd_image_header *header,
552 struct rbd_image_header_ondisk *ondisk)
559 memset(header, 0, sizeof (*header));
561 snap_count = le32_to_cpu(ondisk->snap_count);
563 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
564 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
565 if (!header->object_prefix)
567 memcpy(header->object_prefix, ondisk->object_prefix, len);
568 header->object_prefix[len] = '\0';
571 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
573 /* Save a copy of the snapshot names */
575 if (snap_names_len > (u64) SIZE_MAX)
577 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
578 if (!header->snap_names)
581 * Note that rbd_dev_v1_header_read() guarantees
582 * the ondisk buffer we're working with has
583 * snap_names_len bytes beyond the end of the
584 * snapshot id array, this memcpy() is safe.
586 memcpy(header->snap_names, &ondisk->snaps[snap_count],
589 /* Record each snapshot's size */
591 size = snap_count * sizeof (*header->snap_sizes);
592 header->snap_sizes = kmalloc(size, GFP_KERNEL);
593 if (!header->snap_sizes)
595 for (i = 0; i < snap_count; i++)
596 header->snap_sizes[i] =
597 le64_to_cpu(ondisk->snaps[i].image_size);
599 WARN_ON(ondisk->snap_names_len);
600 header->snap_names = NULL;
601 header->snap_sizes = NULL;
604 header->features = 0; /* No features support in v1 images */
605 header->obj_order = ondisk->options.order;
606 header->crypt_type = ondisk->options.crypt_type;
607 header->comp_type = ondisk->options.comp_type;
609 /* Allocate and fill in the snapshot context */
611 header->image_size = le64_to_cpu(ondisk->image_size);
612 size = sizeof (struct ceph_snap_context);
613 size += snap_count * sizeof (header->snapc->snaps[0]);
614 header->snapc = kzalloc(size, GFP_KERNEL);
618 atomic_set(&header->snapc->nref, 1);
619 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
620 header->snapc->num_snaps = snap_count;
621 for (i = 0; i < snap_count; i++)
622 header->snapc->snaps[i] =
623 le64_to_cpu(ondisk->snaps[i].id);
628 kfree(header->snap_sizes);
629 header->snap_sizes = NULL;
630 kfree(header->snap_names);
631 header->snap_names = NULL;
632 kfree(header->object_prefix);
633 header->object_prefix = NULL;
638 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
641 struct rbd_snap *snap;
643 list_for_each_entry(snap, &rbd_dev->snaps, node) {
644 if (!strcmp(snap_name, snap->name)) {
645 rbd_dev->mapping.snap_id = snap->id;
646 rbd_dev->mapping.size = snap->size;
647 rbd_dev->mapping.features = snap->features;
656 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
660 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
661 sizeof (RBD_SNAP_HEAD_NAME))) {
662 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
663 rbd_dev->mapping.size = rbd_dev->header.image_size;
664 rbd_dev->mapping.features = rbd_dev->header.features;
665 rbd_dev->mapping.snap_exists = false;
666 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
669 ret = snap_by_name(rbd_dev, snap_name);
672 rbd_dev->mapping.snap_exists = true;
673 rbd_dev->mapping.read_only = true;
675 rbd_dev->mapping.snap_name = snap_name;
680 static void rbd_header_free(struct rbd_image_header *header)
682 kfree(header->object_prefix);
683 header->object_prefix = NULL;
684 kfree(header->snap_sizes);
685 header->snap_sizes = NULL;
686 kfree(header->snap_names);
687 header->snap_names = NULL;
688 ceph_put_snap_context(header->snapc);
689 header->snapc = NULL;
692 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
698 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
701 segment = offset >> rbd_dev->header.obj_order;
702 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
703 rbd_dev->header.object_prefix, segment);
704 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
705 pr_err("error formatting segment name for #%llu (%d)\n",
714 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
716 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
718 return offset & (segment_size - 1);
721 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
722 u64 offset, u64 length)
724 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
726 offset &= segment_size - 1;
728 rbd_assert(length <= U64_MAX - offset);
729 if (offset + length > segment_size)
730 length = segment_size - offset;
735 static int rbd_get_num_segments(struct rbd_image_header *header,
743 if (len - 1 > U64_MAX - ofs)
746 start_seg = ofs >> header->obj_order;
747 end_seg = (ofs + len - 1) >> header->obj_order;
749 return end_seg - start_seg + 1;
753 * returns the size of an object in the image
755 static u64 rbd_obj_bytes(struct rbd_image_header *header)
757 return 1 << header->obj_order;
764 static void bio_chain_put(struct bio *chain)
770 chain = chain->bi_next;
776 * zeros a bio chain, starting at specific offset
778 static void zero_bio_chain(struct bio *chain, int start_ofs)
787 bio_for_each_segment(bv, chain, i) {
788 if (pos + bv->bv_len > start_ofs) {
789 int remainder = max(start_ofs - pos, 0);
790 buf = bvec_kmap_irq(bv, &flags);
791 memset(buf + remainder, 0,
792 bv->bv_len - remainder);
793 bvec_kunmap_irq(buf, &flags);
798 chain = chain->bi_next;
803 * bio_chain_clone - clone a chain of bios up to a certain length.
804 * might return a bio_pair that will need to be released.
806 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
807 struct bio_pair **bp,
808 int len, gfp_t gfpmask)
810 struct bio *old_chain = *old;
811 struct bio *new_chain = NULL;
816 bio_pair_release(*bp);
820 while (old_chain && (total < len)) {
823 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
826 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
828 if (total + old_chain->bi_size > len) {
832 * this split can only happen with a single paged bio,
833 * split_bio will BUG_ON if this is not the case
835 dout("bio_chain_clone split! total=%d remaining=%d"
837 total, len - total, old_chain->bi_size);
839 /* split the bio. We'll release it either in the next
840 call, or it will have to be released outside */
841 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
845 __bio_clone(tmp, &bp->bio1);
849 __bio_clone(tmp, old_chain);
850 *next = old_chain->bi_next;
860 old_chain = old_chain->bi_next;
862 total += tmp->bi_size;
865 rbd_assert(total == len);
872 dout("bio_chain_clone with err\n");
873 bio_chain_put(new_chain);
878 * helpers for osd request op vectors.
880 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
881 int opcode, u32 payload_len)
883 struct ceph_osd_req_op *ops;
885 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
892 * op extent offset and length will be set later on
893 * in calc_raw_layout()
895 ops[0].payload_len = payload_len;
900 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
905 static void rbd_coll_end_req_index(struct request *rq,
906 struct rbd_req_coll *coll,
910 struct request_queue *q;
913 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
914 coll, index, ret, (unsigned long long) len);
920 blk_end_request(rq, ret, len);
926 spin_lock_irq(q->queue_lock);
927 coll->status[index].done = 1;
928 coll->status[index].rc = ret;
929 coll->status[index].bytes = len;
930 max = min = coll->num_done;
931 while (max < coll->total && coll->status[max].done)
934 for (i = min; i<max; i++) {
935 __blk_end_request(rq, coll->status[i].rc,
936 coll->status[i].bytes);
938 kref_put(&coll->kref, rbd_coll_release);
940 spin_unlock_irq(q->queue_lock);
943 static void rbd_coll_end_req(struct rbd_request *req,
946 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
950 * Send ceph osd request
952 static int rbd_do_request(struct request *rq,
953 struct rbd_device *rbd_dev,
954 struct ceph_snap_context *snapc,
956 const char *object_name, u64 ofs, u64 len,
961 struct ceph_osd_req_op *ops,
962 struct rbd_req_coll *coll,
964 void (*rbd_cb)(struct ceph_osd_request *req,
965 struct ceph_msg *msg),
966 struct ceph_osd_request **linger_req,
969 struct ceph_osd_request *req;
970 struct ceph_file_layout *layout;
973 struct timespec mtime = CURRENT_TIME;
974 struct rbd_request *req_data;
975 struct ceph_osd_request_head *reqhead;
976 struct ceph_osd_client *osdc;
978 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
981 rbd_coll_end_req_index(rq, coll, coll_index,
987 req_data->coll = coll;
988 req_data->coll_index = coll_index;
991 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
992 (unsigned long long) ofs, (unsigned long long) len);
994 osdc = &rbd_dev->rbd_client->client->osdc;
995 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
996 false, GFP_NOIO, pages, bio);
1002 req->r_callback = rbd_cb;
1005 req_data->bio = bio;
1006 req_data->pages = pages;
1007 req_data->len = len;
1009 req->r_priv = req_data;
1011 reqhead = req->r_request->front.iov_base;
1012 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1014 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1015 req->r_oid_len = strlen(req->r_oid);
1017 layout = &req->r_file_layout;
1018 memset(layout, 0, sizeof(*layout));
1019 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020 layout->fl_stripe_count = cpu_to_le32(1);
1021 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1022 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1023 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1025 rbd_assert(ret == 0);
1027 ceph_osdc_build_request(req, ofs, &len,
1031 req->r_oid, req->r_oid_len);
1034 ceph_osdc_set_request_linger(osdc, req);
1038 ret = ceph_osdc_start_request(osdc, req, false);
1043 ret = ceph_osdc_wait_request(osdc, req);
1045 *ver = le64_to_cpu(req->r_reassert_version.version);
1046 dout("reassert_ver=%llu\n",
1047 (unsigned long long)
1048 le64_to_cpu(req->r_reassert_version.version));
1049 ceph_osdc_put_request(req);
1054 bio_chain_put(req_data->bio);
1055 ceph_osdc_put_request(req);
1057 rbd_coll_end_req(req_data, ret, len);
1063 * Ceph osd op callback
1065 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1067 struct rbd_request *req_data = req->r_priv;
1068 struct ceph_osd_reply_head *replyhead;
1069 struct ceph_osd_op *op;
1075 replyhead = msg->front.iov_base;
1076 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1077 op = (void *)(replyhead + 1);
1078 rc = le32_to_cpu(replyhead->result);
1079 bytes = le64_to_cpu(op->extent.length);
1080 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1082 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1083 (unsigned long long) bytes, read_op, (int) rc);
1085 if (rc == -ENOENT && read_op) {
1086 zero_bio_chain(req_data->bio, 0);
1088 } else if (rc == 0 && read_op && bytes < req_data->len) {
1089 zero_bio_chain(req_data->bio, bytes);
1090 bytes = req_data->len;
1093 rbd_coll_end_req(req_data, rc, bytes);
1096 bio_chain_put(req_data->bio);
1098 ceph_osdc_put_request(req);
1102 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1104 ceph_osdc_put_request(req);
1108 * Do a synchronous ceph osd operation
1110 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1111 struct ceph_snap_context *snapc,
1114 struct ceph_osd_req_op *ops,
1115 const char *object_name,
1116 u64 ofs, u64 inbound_size,
1118 struct ceph_osd_request **linger_req,
1122 struct page **pages;
1125 rbd_assert(ops != NULL);
1127 num_pages = calc_pages_for(ofs, inbound_size);
1128 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1130 return PTR_ERR(pages);
1132 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1133 object_name, ofs, inbound_size, NULL,
1143 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1144 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1147 ceph_release_page_vector(pages, num_pages);
1152 * Do an asynchronous ceph osd operation
1154 static int rbd_do_op(struct request *rq,
1155 struct rbd_device *rbd_dev,
1156 struct ceph_snap_context *snapc,
1158 int opcode, int flags,
1161 struct rbd_req_coll *coll,
1168 struct ceph_osd_req_op *ops;
1171 seg_name = rbd_segment_name(rbd_dev, ofs);
1174 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1175 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1177 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1180 ops = rbd_create_rw_ops(1, opcode, payload_len);
1184 /* we've taken care of segment sizes earlier when we
1185 cloned the bios. We should never have a segment
1186 truncated at this point */
1187 rbd_assert(seg_len == len);
1189 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1190 seg_name, seg_ofs, seg_len,
1196 rbd_req_cb, 0, NULL);
1198 rbd_destroy_ops(ops);
1205 * Request async osd write
1207 static int rbd_req_write(struct request *rq,
1208 struct rbd_device *rbd_dev,
1209 struct ceph_snap_context *snapc,
1212 struct rbd_req_coll *coll,
1215 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1217 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1218 ofs, len, bio, coll, coll_index);
1222 * Request async osd read
1224 static int rbd_req_read(struct request *rq,
1225 struct rbd_device *rbd_dev,
1229 struct rbd_req_coll *coll,
1232 return rbd_do_op(rq, rbd_dev, NULL,
1236 ofs, len, bio, coll, coll_index);
1240 * Request sync osd read
1242 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1244 const char *object_name,
1249 struct ceph_osd_req_op *ops;
1252 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1256 ret = rbd_req_sync_op(rbd_dev, NULL,
1259 ops, object_name, ofs, len, buf, NULL, ver);
1260 rbd_destroy_ops(ops);
1266 * Request sync osd watch
1268 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1272 struct ceph_osd_req_op *ops;
1275 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1279 ops[0].watch.ver = cpu_to_le64(ver);
1280 ops[0].watch.cookie = notify_id;
1281 ops[0].watch.flag = 0;
1283 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1284 rbd_dev->header_name, 0, 0, NULL,
1289 rbd_simple_req_cb, 0, NULL);
1291 rbd_destroy_ops(ops);
1295 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1297 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1304 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1305 rbd_dev->header_name, (unsigned long long) notify_id,
1306 (unsigned int) opcode);
1307 rc = rbd_refresh_header(rbd_dev, &hver);
1309 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1310 " update snaps: %d\n", rbd_dev->major, rc);
1312 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1316 * Request sync osd watch
1318 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1320 struct ceph_osd_req_op *ops;
1321 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1324 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1328 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1329 (void *)rbd_dev, &rbd_dev->watch_event);
1333 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1334 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1335 ops[0].watch.flag = 1;
1337 ret = rbd_req_sync_op(rbd_dev, NULL,
1339 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1341 rbd_dev->header_name,
1343 &rbd_dev->watch_request, NULL);
1348 rbd_destroy_ops(ops);
1352 ceph_osdc_cancel_event(rbd_dev->watch_event);
1353 rbd_dev->watch_event = NULL;
1355 rbd_destroy_ops(ops);
1360 * Request sync osd unwatch
1362 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1364 struct ceph_osd_req_op *ops;
1367 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1371 ops[0].watch.ver = 0;
1372 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1373 ops[0].watch.flag = 0;
1375 ret = rbd_req_sync_op(rbd_dev, NULL,
1377 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1379 rbd_dev->header_name,
1380 0, 0, NULL, NULL, NULL);
1383 rbd_destroy_ops(ops);
1384 ceph_osdc_cancel_event(rbd_dev->watch_event);
1385 rbd_dev->watch_event = NULL;
1390 * Synchronous osd object method call
1392 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1393 const char *object_name,
1394 const char *class_name,
1395 const char *method_name,
1396 const char *outbound,
1397 size_t outbound_size,
1399 size_t inbound_size,
1403 struct ceph_osd_req_op *ops;
1404 int class_name_len = strlen(class_name);
1405 int method_name_len = strlen(method_name);
1410 * Any input parameters required by the method we're calling
1411 * will be sent along with the class and method names as
1412 * part of the message payload. That data and its size are
1413 * supplied via the indata and indata_len fields (named from
1414 * the perspective of the server side) in the OSD request
1417 payload_size = class_name_len + method_name_len + outbound_size;
1418 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1422 ops[0].cls.class_name = class_name;
1423 ops[0].cls.class_len = (__u8) class_name_len;
1424 ops[0].cls.method_name = method_name;
1425 ops[0].cls.method_len = (__u8) method_name_len;
1426 ops[0].cls.argc = 0;
1427 ops[0].cls.indata = outbound;
1428 ops[0].cls.indata_len = outbound_size;
1430 ret = rbd_req_sync_op(rbd_dev, NULL,
1433 object_name, 0, inbound_size, inbound,
1436 rbd_destroy_ops(ops);
1438 dout("cls_exec returned %d\n", ret);
1442 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1444 struct rbd_req_coll *coll =
1445 kzalloc(sizeof(struct rbd_req_coll) +
1446 sizeof(struct rbd_req_status) * num_reqs,
1451 coll->total = num_reqs;
1452 kref_init(&coll->kref);
1457 * block device queue callback
1459 static void rbd_rq_fn(struct request_queue *q)
1461 struct rbd_device *rbd_dev = q->queuedata;
1463 struct bio_pair *bp = NULL;
1465 while ((rq = blk_fetch_request(q))) {
1467 struct bio *rq_bio, *next_bio = NULL;
1472 int num_segs, cur_seg = 0;
1473 struct rbd_req_coll *coll;
1474 struct ceph_snap_context *snapc;
1476 dout("fetched request\n");
1478 /* filter out block requests we don't understand */
1479 if ((rq->cmd_type != REQ_TYPE_FS)) {
1480 __blk_end_request_all(rq, 0);
1484 /* deduce our operation (read, write) */
1485 do_write = (rq_data_dir(rq) == WRITE);
1487 size = blk_rq_bytes(rq);
1488 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1490 if (do_write && rbd_dev->mapping.read_only) {
1491 __blk_end_request_all(rq, -EROFS);
1495 spin_unlock_irq(q->queue_lock);
1497 down_read(&rbd_dev->header_rwsem);
1499 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1500 !rbd_dev->mapping.snap_exists) {
1501 up_read(&rbd_dev->header_rwsem);
1502 dout("request for non-existent snapshot");
1503 spin_lock_irq(q->queue_lock);
1504 __blk_end_request_all(rq, -ENXIO);
1508 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1510 up_read(&rbd_dev->header_rwsem);
1512 dout("%s 0x%x bytes at 0x%llx\n",
1513 do_write ? "write" : "read",
1514 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1516 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1517 if (num_segs <= 0) {
1518 spin_lock_irq(q->queue_lock);
1519 __blk_end_request_all(rq, num_segs);
1520 ceph_put_snap_context(snapc);
1523 coll = rbd_alloc_coll(num_segs);
1525 spin_lock_irq(q->queue_lock);
1526 __blk_end_request_all(rq, -ENOMEM);
1527 ceph_put_snap_context(snapc);
1532 /* a bio clone to be passed down to OSD req */
1533 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534 op_size = rbd_segment_length(rbd_dev, ofs, size);
1535 kref_get(&coll->kref);
1536 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1537 op_size, GFP_ATOMIC);
1539 rbd_coll_end_req_index(rq, coll, cur_seg,
1545 /* init OSD command: write or read */
1547 rbd_req_write(rq, rbd_dev,
1553 rbd_req_read(rq, rbd_dev,
1554 rbd_dev->mapping.snap_id,
1566 kref_put(&coll->kref, rbd_coll_release);
1569 bio_pair_release(bp);
1570 spin_lock_irq(q->queue_lock);
1572 ceph_put_snap_context(snapc);
1577 * a queue callback. Makes sure that we don't create a bio that spans across
1578 * multiple osd objects. One exception would be with a single page bios,
1579 * which we handle later at bio_chain_clone
1581 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1582 struct bio_vec *bvec)
1584 struct rbd_device *rbd_dev = q->queuedata;
1585 unsigned int chunk_sectors;
1587 unsigned int bio_sectors;
1590 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1591 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1592 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1594 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1595 + bio_sectors)) << SECTOR_SHIFT;
1597 max = 0; /* bio_add cannot handle a negative return */
1598 if (max <= bvec->bv_len && bio_sectors == 0)
1599 return bvec->bv_len;
1603 static void rbd_free_disk(struct rbd_device *rbd_dev)
1605 struct gendisk *disk = rbd_dev->disk;
1610 if (disk->flags & GENHD_FL_UP)
1613 blk_cleanup_queue(disk->queue);
1618 * Read the complete header for the given rbd device.
1620 * Returns a pointer to a dynamically-allocated buffer containing
1621 * the complete and validated header. Caller can pass the address
1622 * of a variable that will be filled in with the version of the
1623 * header object at the time it was read.
1625 * Returns a pointer-coded errno if a failure occurs.
1627 static struct rbd_image_header_ondisk *
1628 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1630 struct rbd_image_header_ondisk *ondisk = NULL;
1637 * The complete header will include an array of its 64-bit
1638 * snapshot ids, followed by the names of those snapshots as
1639 * a contiguous block of NUL-terminated strings. Note that
1640 * the number of snapshots could change by the time we read
1641 * it in, in which case we re-read it.
1648 size = sizeof (*ondisk);
1649 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1651 ondisk = kmalloc(size, GFP_KERNEL);
1653 return ERR_PTR(-ENOMEM);
1655 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1656 rbd_dev->header_name,
1658 (char *) ondisk, version);
1662 if (WARN_ON((size_t) ret < size)) {
1664 pr_warning("short header read for image %s"
1665 " (want %zd got %d)\n",
1666 rbd_dev->image_name, size, ret);
1669 if (!rbd_dev_ondisk_valid(ondisk)) {
1671 pr_warning("invalid header for image %s\n",
1672 rbd_dev->image_name);
1676 names_size = le64_to_cpu(ondisk->snap_names_len);
1677 want_count = snap_count;
1678 snap_count = le32_to_cpu(ondisk->snap_count);
1679 } while (snap_count != want_count);
1686 return ERR_PTR(ret);
1690 * reload the ondisk the header
1692 static int rbd_read_header(struct rbd_device *rbd_dev,
1693 struct rbd_image_header *header)
1695 struct rbd_image_header_ondisk *ondisk;
1699 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1701 return PTR_ERR(ondisk);
1702 ret = rbd_header_from_disk(header, ondisk);
1704 header->obj_version = ver;
1710 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1712 struct rbd_snap *snap;
1713 struct rbd_snap *next;
1715 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1716 __rbd_remove_snap_dev(snap);
1719 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1723 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1726 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1727 dout("setting size to %llu sectors", (unsigned long long) size);
1728 rbd_dev->mapping.size = (u64) size;
1729 set_capacity(rbd_dev->disk, size);
1733 * only read the first part of the ondisk header, without the snaps info
1735 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1738 struct rbd_image_header h;
1740 ret = rbd_read_header(rbd_dev, &h);
1744 down_write(&rbd_dev->header_rwsem);
1746 /* Update image size, and check for resize of mapped image */
1747 rbd_dev->header.image_size = h.image_size;
1748 rbd_update_mapping_size(rbd_dev);
1750 /* rbd_dev->header.object_prefix shouldn't change */
1751 kfree(rbd_dev->header.snap_sizes);
1752 kfree(rbd_dev->header.snap_names);
1753 /* osd requests may still refer to snapc */
1754 ceph_put_snap_context(rbd_dev->header.snapc);
1757 *hver = h.obj_version;
1758 rbd_dev->header.obj_version = h.obj_version;
1759 rbd_dev->header.image_size = h.image_size;
1760 rbd_dev->header.snapc = h.snapc;
1761 rbd_dev->header.snap_names = h.snap_names;
1762 rbd_dev->header.snap_sizes = h.snap_sizes;
1763 /* Free the extra copy of the object prefix */
1764 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1765 kfree(h.object_prefix);
1767 ret = rbd_dev_snaps_update(rbd_dev);
1769 ret = rbd_dev_snaps_register(rbd_dev);
1771 up_write(&rbd_dev->header_rwsem);
1776 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1780 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1781 ret = __rbd_refresh_header(rbd_dev, hver);
1782 mutex_unlock(&ctl_mutex);
1787 static int rbd_init_disk(struct rbd_device *rbd_dev)
1789 struct gendisk *disk;
1790 struct request_queue *q;
1793 /* create gendisk info */
1794 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1798 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1800 disk->major = rbd_dev->major;
1801 disk->first_minor = 0;
1802 disk->fops = &rbd_bd_ops;
1803 disk->private_data = rbd_dev;
1806 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1810 /* We use the default size, but let's be explicit about it. */
1811 blk_queue_physical_block_size(q, SECTOR_SIZE);
1813 /* set io sizes to object size */
1814 segment_size = rbd_obj_bytes(&rbd_dev->header);
1815 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1816 blk_queue_max_segment_size(q, segment_size);
1817 blk_queue_io_min(q, segment_size);
1818 blk_queue_io_opt(q, segment_size);
1820 blk_queue_merge_bvec(q, rbd_merge_bvec);
1823 q->queuedata = rbd_dev;
1825 rbd_dev->disk = disk;
1827 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1840 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1842 return container_of(dev, struct rbd_device, dev);
1845 static ssize_t rbd_size_show(struct device *dev,
1846 struct device_attribute *attr, char *buf)
1848 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1851 down_read(&rbd_dev->header_rwsem);
1852 size = get_capacity(rbd_dev->disk);
1853 up_read(&rbd_dev->header_rwsem);
1855 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1859 * Note this shows the features for whatever's mapped, which is not
1860 * necessarily the base image.
1862 static ssize_t rbd_features_show(struct device *dev,
1863 struct device_attribute *attr, char *buf)
1865 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1867 return sprintf(buf, "0x%016llx\n",
1868 (unsigned long long) rbd_dev->mapping.features);
1871 static ssize_t rbd_major_show(struct device *dev,
1872 struct device_attribute *attr, char *buf)
1874 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1876 return sprintf(buf, "%d\n", rbd_dev->major);
1879 static ssize_t rbd_client_id_show(struct device *dev,
1880 struct device_attribute *attr, char *buf)
1882 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1884 return sprintf(buf, "client%lld\n",
1885 ceph_client_id(rbd_dev->rbd_client->client));
1888 static ssize_t rbd_pool_show(struct device *dev,
1889 struct device_attribute *attr, char *buf)
1891 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1893 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1896 static ssize_t rbd_pool_id_show(struct device *dev,
1897 struct device_attribute *attr, char *buf)
1899 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1901 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1904 static ssize_t rbd_name_show(struct device *dev,
1905 struct device_attribute *attr, char *buf)
1907 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1909 return sprintf(buf, "%s\n", rbd_dev->image_name);
1912 static ssize_t rbd_image_id_show(struct device *dev,
1913 struct device_attribute *attr, char *buf)
1915 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1917 return sprintf(buf, "%s\n", rbd_dev->image_id);
1921 * Shows the name of the currently-mapped snapshot (or
1922 * RBD_SNAP_HEAD_NAME for the base image).
1924 static ssize_t rbd_snap_show(struct device *dev,
1925 struct device_attribute *attr,
1928 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1930 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1933 static ssize_t rbd_image_refresh(struct device *dev,
1934 struct device_attribute *attr,
1938 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1941 ret = rbd_refresh_header(rbd_dev, NULL);
1943 return ret < 0 ? ret : size;
1946 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1947 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1948 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1949 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1950 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1951 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1952 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1953 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1954 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1955 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1957 static struct attribute *rbd_attrs[] = {
1958 &dev_attr_size.attr,
1959 &dev_attr_features.attr,
1960 &dev_attr_major.attr,
1961 &dev_attr_client_id.attr,
1962 &dev_attr_pool.attr,
1963 &dev_attr_pool_id.attr,
1964 &dev_attr_name.attr,
1965 &dev_attr_image_id.attr,
1966 &dev_attr_current_snap.attr,
1967 &dev_attr_refresh.attr,
1971 static struct attribute_group rbd_attr_group = {
1975 static const struct attribute_group *rbd_attr_groups[] = {
1980 static void rbd_sysfs_dev_release(struct device *dev)
1984 static struct device_type rbd_device_type = {
1986 .groups = rbd_attr_groups,
1987 .release = rbd_sysfs_dev_release,
1995 static ssize_t rbd_snap_size_show(struct device *dev,
1996 struct device_attribute *attr,
1999 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2004 static ssize_t rbd_snap_id_show(struct device *dev,
2005 struct device_attribute *attr,
2008 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2010 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2013 static ssize_t rbd_snap_features_show(struct device *dev,
2014 struct device_attribute *attr,
2017 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2019 return sprintf(buf, "0x%016llx\n",
2020 (unsigned long long) snap->features);
2023 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2024 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2025 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2027 static struct attribute *rbd_snap_attrs[] = {
2028 &dev_attr_snap_size.attr,
2029 &dev_attr_snap_id.attr,
2030 &dev_attr_snap_features.attr,
2034 static struct attribute_group rbd_snap_attr_group = {
2035 .attrs = rbd_snap_attrs,
2038 static void rbd_snap_dev_release(struct device *dev)
2040 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2045 static const struct attribute_group *rbd_snap_attr_groups[] = {
2046 &rbd_snap_attr_group,
2050 static struct device_type rbd_snap_device_type = {
2051 .groups = rbd_snap_attr_groups,
2052 .release = rbd_snap_dev_release,
2055 static bool rbd_snap_registered(struct rbd_snap *snap)
2057 bool ret = snap->dev.type == &rbd_snap_device_type;
2058 bool reg = device_is_registered(&snap->dev);
2060 rbd_assert(!ret ^ reg);
2065 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2067 list_del(&snap->node);
2068 if (device_is_registered(&snap->dev))
2069 device_unregister(&snap->dev);
2072 static int rbd_register_snap_dev(struct rbd_snap *snap,
2073 struct device *parent)
2075 struct device *dev = &snap->dev;
2078 dev->type = &rbd_snap_device_type;
2079 dev->parent = parent;
2080 dev->release = rbd_snap_dev_release;
2081 dev_set_name(dev, "snap_%s", snap->name);
2082 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2084 ret = device_register(dev);
2089 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2090 const char *snap_name,
2091 u64 snap_id, u64 snap_size,
2094 struct rbd_snap *snap;
2097 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2099 return ERR_PTR(-ENOMEM);
2102 snap->name = kstrdup(snap_name, GFP_KERNEL);
2107 snap->size = snap_size;
2108 snap->features = snap_features;
2116 return ERR_PTR(ret);
2119 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2120 u64 *snap_size, u64 *snap_features)
2124 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2126 *snap_size = rbd_dev->header.snap_sizes[which];
2127 *snap_features = 0; /* No features for v1 */
2129 /* Skip over names until we find the one we are looking for */
2131 snap_name = rbd_dev->header.snap_names;
2133 snap_name += strlen(snap_name) + 1;
2139 * Get the size and object order for an image snapshot, or if
2140 * snap_id is CEPH_NOSNAP, gets this information for the base
2143 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2144 u8 *order, u64 *snap_size)
2146 __le64 snapid = cpu_to_le64(snap_id);
2151 } __attribute__ ((packed)) size_buf = { 0 };
2153 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2155 (char *) &snapid, sizeof (snapid),
2156 (char *) &size_buf, sizeof (size_buf),
2157 CEPH_OSD_FLAG_READ, NULL);
2158 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2162 *order = size_buf.order;
2163 *snap_size = le64_to_cpu(size_buf.size);
2165 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2166 (unsigned long long) snap_id, (unsigned int) *order,
2167 (unsigned long long) *snap_size);
2172 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2174 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2175 &rbd_dev->header.obj_order,
2176 &rbd_dev->header.image_size);
2179 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2185 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2189 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2190 "rbd", "get_object_prefix",
2192 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2193 CEPH_OSD_FLAG_READ, NULL);
2194 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2199 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2200 p + RBD_OBJ_PREFIX_LEN_MAX,
2203 if (IS_ERR(rbd_dev->header.object_prefix)) {
2204 ret = PTR_ERR(rbd_dev->header.object_prefix);
2205 rbd_dev->header.object_prefix = NULL;
2207 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2216 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2219 __le64 snapid = cpu_to_le64(snap_id);
2223 } features_buf = { 0 };
2226 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2227 "rbd", "get_features",
2228 (char *) &snapid, sizeof (snapid),
2229 (char *) &features_buf, sizeof (features_buf),
2230 CEPH_OSD_FLAG_READ, NULL);
2231 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2234 *snap_features = le64_to_cpu(features_buf.features);
2236 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2237 (unsigned long long) snap_id,
2238 (unsigned long long) *snap_features,
2239 (unsigned long long) le64_to_cpu(features_buf.incompat));
2244 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2246 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2247 &rbd_dev->header.features);
2250 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2259 struct ceph_snap_context *snapc;
2263 * We'll need room for the seq value (maximum snapshot id),
2264 * snapshot count, and array of that many snapshot ids.
2265 * For now we have a fixed upper limit on the number we're
2266 * prepared to receive.
2268 size = sizeof (__le64) + sizeof (__le32) +
2269 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2270 reply_buf = kzalloc(size, GFP_KERNEL);
2274 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2275 "rbd", "get_snapcontext",
2278 CEPH_OSD_FLAG_READ, ver);
2279 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2285 end = (char *) reply_buf + size;
2286 ceph_decode_64_safe(&p, end, seq, out);
2287 ceph_decode_32_safe(&p, end, snap_count, out);
2290 * Make sure the reported number of snapshot ids wouldn't go
2291 * beyond the end of our buffer. But before checking that,
2292 * make sure the computed size of the snapshot context we
2293 * allocate is representable in a size_t.
2295 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2300 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2303 size = sizeof (struct ceph_snap_context) +
2304 snap_count * sizeof (snapc->snaps[0]);
2305 snapc = kmalloc(size, GFP_KERNEL);
2311 atomic_set(&snapc->nref, 1);
2313 snapc->num_snaps = snap_count;
2314 for (i = 0; i < snap_count; i++)
2315 snapc->snaps[i] = ceph_decode_64(&p);
2317 rbd_dev->header.snapc = snapc;
2319 dout(" snap context seq = %llu, snap_count = %u\n",
2320 (unsigned long long) seq, (unsigned int) snap_count);
2328 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2336 size_t snap_name_len;
2339 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2340 reply_buf = kmalloc(size, GFP_KERNEL);
2342 return ERR_PTR(-ENOMEM);
2344 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2345 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2346 "rbd", "get_snapshot_name",
2347 (char *) &snap_id, sizeof (snap_id),
2349 CEPH_OSD_FLAG_READ, NULL);
2350 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2355 end = (char *) reply_buf + size;
2357 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2359 if (IS_ERR(snap_name)) {
2360 ret = PTR_ERR(snap_name);
2363 dout(" snap_id 0x%016llx snap_name = %s\n",
2364 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2372 return ERR_PTR(ret);
2375 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2376 u64 *snap_size, u64 *snap_features)
2382 snap_id = rbd_dev->header.snapc->snaps[which];
2383 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2385 return ERR_PTR(ret);
2386 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2388 return ERR_PTR(ret);
2390 return rbd_dev_v2_snap_name(rbd_dev, which);
2393 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2394 u64 *snap_size, u64 *snap_features)
2396 if (rbd_dev->image_format == 1)
2397 return rbd_dev_v1_snap_info(rbd_dev, which,
2398 snap_size, snap_features);
2399 if (rbd_dev->image_format == 2)
2400 return rbd_dev_v2_snap_info(rbd_dev, which,
2401 snap_size, snap_features);
2402 return ERR_PTR(-EINVAL);
2406 * Scan the rbd device's current snapshot list and compare it to the
2407 * newly-received snapshot context. Remove any existing snapshots
2408 * not present in the new snapshot context. Add a new snapshot for
2409 * any snaphots in the snapshot context not in the current list.
2410 * And verify there are no changes to snapshots we already know
2413 * Assumes the snapshots in the snapshot context are sorted by
2414 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2415 * are also maintained in that order.)
2417 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2419 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2420 const u32 snap_count = snapc->num_snaps;
2421 struct list_head *head = &rbd_dev->snaps;
2422 struct list_head *links = head->next;
2425 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2426 while (index < snap_count || links != head) {
2428 struct rbd_snap *snap;
2431 u64 snap_features = 0;
2433 snap_id = index < snap_count ? snapc->snaps[index]
2435 snap = links != head ? list_entry(links, struct rbd_snap, node)
2437 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2439 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2440 struct list_head *next = links->next;
2442 /* Existing snapshot not in the new snap context */
2444 if (rbd_dev->mapping.snap_id == snap->id)
2445 rbd_dev->mapping.snap_exists = false;
2446 __rbd_remove_snap_dev(snap);
2447 dout("%ssnap id %llu has been removed\n",
2448 rbd_dev->mapping.snap_id == snap->id ?
2450 (unsigned long long) snap->id);
2452 /* Done with this list entry; advance */
2458 snap_name = rbd_dev_snap_info(rbd_dev, index,
2459 &snap_size, &snap_features);
2460 if (IS_ERR(snap_name))
2461 return PTR_ERR(snap_name);
2463 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2464 (unsigned long long) snap_id);
2465 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2466 struct rbd_snap *new_snap;
2468 /* We haven't seen this snapshot before */
2470 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2471 snap_id, snap_size, snap_features);
2472 if (IS_ERR(new_snap)) {
2473 int err = PTR_ERR(new_snap);
2475 dout(" failed to add dev, error %d\n", err);
2480 /* New goes before existing, or at end of list */
2482 dout(" added dev%s\n", snap ? "" : " at end\n");
2484 list_add_tail(&new_snap->node, &snap->node);
2486 list_add_tail(&new_snap->node, head);
2488 /* Already have this one */
2490 dout(" already present\n");
2492 rbd_assert(snap->size == snap_size);
2493 rbd_assert(!strcmp(snap->name, snap_name));
2494 rbd_assert(snap->features == snap_features);
2496 /* Done with this list entry; advance */
2498 links = links->next;
2501 /* Advance to the next entry in the snapshot context */
2505 dout("%s: done\n", __func__);
2511 * Scan the list of snapshots and register the devices for any that
2512 * have not already been registered.
2514 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2516 struct rbd_snap *snap;
2519 dout("%s called\n", __func__);
2520 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2523 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2524 if (!rbd_snap_registered(snap)) {
2525 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2530 dout("%s: returning %d\n", __func__, ret);
2535 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2540 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2542 dev = &rbd_dev->dev;
2543 dev->bus = &rbd_bus_type;
2544 dev->type = &rbd_device_type;
2545 dev->parent = &rbd_root_dev;
2546 dev->release = rbd_dev_release;
2547 dev_set_name(dev, "%d", rbd_dev->dev_id);
2548 ret = device_register(dev);
2550 mutex_unlock(&ctl_mutex);
2555 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2557 device_unregister(&rbd_dev->dev);
2560 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2565 ret = rbd_req_sync_watch(rbd_dev);
2566 if (ret == -ERANGE) {
2567 rc = rbd_refresh_header(rbd_dev, NULL);
2571 } while (ret == -ERANGE);
2576 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2579 * Get a unique rbd identifier for the given new rbd_dev, and add
2580 * the rbd_dev to the global list. The minimum rbd id is 1.
2582 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2584 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2586 spin_lock(&rbd_dev_list_lock);
2587 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2588 spin_unlock(&rbd_dev_list_lock);
2589 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2590 (unsigned long long) rbd_dev->dev_id);
2594 * Remove an rbd_dev from the global list, and record that its
2595 * identifier is no longer in use.
2597 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2599 struct list_head *tmp;
2600 int rbd_id = rbd_dev->dev_id;
2603 rbd_assert(rbd_id > 0);
2605 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2606 (unsigned long long) rbd_dev->dev_id);
2607 spin_lock(&rbd_dev_list_lock);
2608 list_del_init(&rbd_dev->node);
2611 * If the id being "put" is not the current maximum, there
2612 * is nothing special we need to do.
2614 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2615 spin_unlock(&rbd_dev_list_lock);
2620 * We need to update the current maximum id. Search the
2621 * list to find out what it is. We're more likely to find
2622 * the maximum at the end, so search the list backward.
2625 list_for_each_prev(tmp, &rbd_dev_list) {
2626 struct rbd_device *rbd_dev;
2628 rbd_dev = list_entry(tmp, struct rbd_device, node);
2629 if (rbd_id > max_id)
2632 spin_unlock(&rbd_dev_list_lock);
2635 * The max id could have been updated by rbd_dev_id_get(), in
2636 * which case it now accurately reflects the new maximum.
2637 * Be careful not to overwrite the maximum value in that
2640 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2641 dout(" max dev id has been reset\n");
2645 * Skips over white space at *buf, and updates *buf to point to the
2646 * first found non-space character (if any). Returns the length of
2647 * the token (string of non-white space characters) found. Note
2648 * that *buf must be terminated with '\0'.
2650 static inline size_t next_token(const char **buf)
2653 * These are the characters that produce nonzero for
2654 * isspace() in the "C" and "POSIX" locales.
2656 const char *spaces = " \f\n\r\t\v";
2658 *buf += strspn(*buf, spaces); /* Find start of token */
2660 return strcspn(*buf, spaces); /* Return token length */
2664 * Finds the next token in *buf, and if the provided token buffer is
2665 * big enough, copies the found token into it. The result, if
2666 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2667 * must be terminated with '\0' on entry.
2669 * Returns the length of the token found (not including the '\0').
2670 * Return value will be 0 if no token is found, and it will be >=
2671 * token_size if the token would not fit.
2673 * The *buf pointer will be updated to point beyond the end of the
2674 * found token. Note that this occurs even if the token buffer is
2675 * too small to hold it.
2677 static inline size_t copy_token(const char **buf,
2683 len = next_token(buf);
2684 if (len < token_size) {
2685 memcpy(token, *buf, len);
2686 *(token + len) = '\0';
2694 * Finds the next token in *buf, dynamically allocates a buffer big
2695 * enough to hold a copy of it, and copies the token into the new
2696 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2697 * that a duplicate buffer is created even for a zero-length token.
2699 * Returns a pointer to the newly-allocated duplicate, or a null
2700 * pointer if memory for the duplicate was not available. If
2701 * the lenp argument is a non-null pointer, the length of the token
2702 * (not including the '\0') is returned in *lenp.
2704 * If successful, the *buf pointer will be updated to point beyond
2705 * the end of the found token.
2707 * Note: uses GFP_KERNEL for allocation.
2709 static inline char *dup_token(const char **buf, size_t *lenp)
2714 len = next_token(buf);
2715 dup = kmalloc(len + 1, GFP_KERNEL);
2719 memcpy(dup, *buf, len);
2720 *(dup + len) = '\0';
2730 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2731 * rbd_md_name, and name fields of the given rbd_dev, based on the
2732 * list of monitor addresses and other options provided via
2733 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2734 * copy of the snapshot name to map if successful, or a
2735 * pointer-coded error otherwise.
2737 * Note: rbd_dev is assumed to have been initially zero-filled.
2739 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2741 const char **mon_addrs,
2742 size_t *mon_addrs_size,
2744 size_t options_size)
2747 char *err_ptr = ERR_PTR(-EINVAL);
2750 /* The first four tokens are required */
2752 len = next_token(&buf);
2755 *mon_addrs_size = len + 1;
2760 len = copy_token(&buf, options, options_size);
2761 if (!len || len >= options_size)
2764 err_ptr = ERR_PTR(-ENOMEM);
2765 rbd_dev->pool_name = dup_token(&buf, NULL);
2766 if (!rbd_dev->pool_name)
2769 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2770 if (!rbd_dev->image_name)
2773 /* Snapshot name is optional */
2774 len = next_token(&buf);
2776 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2777 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2779 snap_name = kmalloc(len + 1, GFP_KERNEL);
2782 memcpy(snap_name, buf, len);
2783 *(snap_name + len) = '\0';
2785 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2790 kfree(rbd_dev->image_name);
2791 rbd_dev->image_name = NULL;
2792 rbd_dev->image_name_len = 0;
2793 kfree(rbd_dev->pool_name);
2794 rbd_dev->pool_name = NULL;
2800 * An rbd format 2 image has a unique identifier, distinct from the
2801 * name given to it by the user. Internally, that identifier is
2802 * what's used to specify the names of objects related to the image.
2804 * A special "rbd id" object is used to map an rbd image name to its
2805 * id. If that object doesn't exist, then there is no v2 rbd image
2806 * with the supplied name.
2808 * This function will record the given rbd_dev's image_id field if
2809 * it can be determined, and in that case will return 0. If any
2810 * errors occur a negative errno will be returned and the rbd_dev's
2811 * image_id field will be unchanged (and should be NULL).
2813 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2822 * First, see if the format 2 image id file exists, and if
2823 * so, get the image's persistent id from it.
2825 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2826 object_name = kmalloc(size, GFP_NOIO);
2829 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2830 dout("rbd id object name is %s\n", object_name);
2832 /* Response will be an encoded string, which includes a length */
2834 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2835 response = kzalloc(size, GFP_NOIO);
2841 ret = rbd_req_sync_exec(rbd_dev, object_name,
2844 response, RBD_IMAGE_ID_LEN_MAX,
2845 CEPH_OSD_FLAG_READ, NULL);
2846 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2851 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2852 p + RBD_IMAGE_ID_LEN_MAX,
2853 &rbd_dev->image_id_len,
2855 if (IS_ERR(rbd_dev->image_id)) {
2856 ret = PTR_ERR(rbd_dev->image_id);
2857 rbd_dev->image_id = NULL;
2859 dout("image_id is %s\n", rbd_dev->image_id);
2868 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2873 /* Version 1 images have no id; empty string is used */
2875 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2876 if (!rbd_dev->image_id)
2878 rbd_dev->image_id_len = 0;
2880 /* Record the header object name for this rbd image. */
2882 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2883 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2884 if (!rbd_dev->header_name) {
2888 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2890 /* Populate rbd image metadata */
2892 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2895 rbd_dev->image_format = 1;
2897 dout("discovered version 1 image, header name is %s\n",
2898 rbd_dev->header_name);
2903 kfree(rbd_dev->header_name);
2904 rbd_dev->header_name = NULL;
2905 kfree(rbd_dev->image_id);
2906 rbd_dev->image_id = NULL;
2911 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2918 * Image id was filled in by the caller. Record the header
2919 * object name for this rbd image.
2921 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2922 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2923 if (!rbd_dev->header_name)
2925 sprintf(rbd_dev->header_name, "%s%s",
2926 RBD_HEADER_PREFIX, rbd_dev->image_id);
2928 /* Get the size and object order for the image */
2930 ret = rbd_dev_v2_image_size(rbd_dev);
2934 /* Get the object prefix (a.k.a. block_name) for the image */
2936 ret = rbd_dev_v2_object_prefix(rbd_dev);
2940 /* Get the features for the image */
2942 ret = rbd_dev_v2_features(rbd_dev);
2946 /* crypto and compression type aren't (yet) supported for v2 images */
2948 rbd_dev->header.crypt_type = 0;
2949 rbd_dev->header.comp_type = 0;
2951 /* Get the snapshot context, plus the header version */
2953 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2956 rbd_dev->header.obj_version = ver;
2958 rbd_dev->image_format = 2;
2960 dout("discovered version 2 image, header name is %s\n",
2961 rbd_dev->header_name);
2965 kfree(rbd_dev->header_name);
2966 rbd_dev->header_name = NULL;
2967 kfree(rbd_dev->header.object_prefix);
2968 rbd_dev->header.object_prefix = NULL;
2974 * Probe for the existence of the header object for the given rbd
2975 * device. For format 2 images this includes determining the image
2978 static int rbd_dev_probe(struct rbd_device *rbd_dev)
2983 * Get the id from the image id object. If it's not a
2984 * format 2 image, we'll get ENOENT back, and we'll assume
2985 * it's a format 1 image.
2987 ret = rbd_dev_image_id(rbd_dev);
2989 ret = rbd_dev_v1_probe(rbd_dev);
2991 ret = rbd_dev_v2_probe(rbd_dev);
2993 dout("probe failed, returning %d\n", ret);
2998 static ssize_t rbd_add(struct bus_type *bus,
3003 struct rbd_device *rbd_dev = NULL;
3004 const char *mon_addrs = NULL;
3005 size_t mon_addrs_size = 0;
3006 struct ceph_osd_client *osdc;
3010 if (!try_module_get(THIS_MODULE))
3013 options = kmalloc(count, GFP_KERNEL);
3016 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3020 /* static rbd_device initialization */
3021 spin_lock_init(&rbd_dev->lock);
3022 INIT_LIST_HEAD(&rbd_dev->node);
3023 INIT_LIST_HEAD(&rbd_dev->snaps);
3024 init_rwsem(&rbd_dev->header_rwsem);
3026 /* parse add command */
3027 snap_name = rbd_add_parse_args(rbd_dev, buf,
3028 &mon_addrs, &mon_addrs_size, options, count);
3029 if (IS_ERR(snap_name)) {
3030 rc = PTR_ERR(snap_name);
3034 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3039 osdc = &rbd_dev->rbd_client->client->osdc;
3040 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3042 goto err_out_client;
3043 rbd_dev->pool_id = rc;
3045 rc = rbd_dev_probe(rbd_dev);
3047 goto err_out_client;
3048 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3050 /* no need to lock here, as rbd_dev is not registered yet */
3051 rc = rbd_dev_snaps_update(rbd_dev);
3053 goto err_out_header;
3055 rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3057 goto err_out_header;
3059 /* generate unique id: find highest unique id, add one */
3060 rbd_dev_id_get(rbd_dev);
3062 /* Fill in the device name, now that we have its id. */
3063 BUILD_BUG_ON(DEV_NAME_LEN
3064 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3065 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3067 /* Get our block major device number. */
3069 rc = register_blkdev(0, rbd_dev->name);
3072 rbd_dev->major = rc;
3074 /* Set up the blkdev mapping. */
3076 rc = rbd_init_disk(rbd_dev);
3078 goto err_out_blkdev;
3080 rc = rbd_bus_add_dev(rbd_dev);
3085 * At this point cleanup in the event of an error is the job
3086 * of the sysfs code (initiated by rbd_bus_del_dev()).
3089 down_write(&rbd_dev->header_rwsem);
3090 rc = rbd_dev_snaps_register(rbd_dev);
3091 up_write(&rbd_dev->header_rwsem);
3095 rc = rbd_init_watch_dev(rbd_dev);
3099 /* Everything's ready. Announce the disk to the world. */
3101 add_disk(rbd_dev->disk);
3103 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3104 (unsigned long long) rbd_dev->mapping.size);
3109 /* this will also clean up rest of rbd_dev stuff */
3111 rbd_bus_del_dev(rbd_dev);
3116 rbd_free_disk(rbd_dev);
3118 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3120 rbd_dev_id_put(rbd_dev);
3122 rbd_header_free(&rbd_dev->header);
3124 kfree(rbd_dev->header_name);
3125 rbd_put_client(rbd_dev);
3126 kfree(rbd_dev->image_id);
3128 kfree(rbd_dev->mapping.snap_name);
3129 kfree(rbd_dev->image_name);
3130 kfree(rbd_dev->pool_name);
3135 dout("Error adding device %s\n", buf);
3136 module_put(THIS_MODULE);
3138 return (ssize_t) rc;
3141 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3143 struct list_head *tmp;
3144 struct rbd_device *rbd_dev;
3146 spin_lock(&rbd_dev_list_lock);
3147 list_for_each(tmp, &rbd_dev_list) {
3148 rbd_dev = list_entry(tmp, struct rbd_device, node);
3149 if (rbd_dev->dev_id == dev_id) {
3150 spin_unlock(&rbd_dev_list_lock);
3154 spin_unlock(&rbd_dev_list_lock);
3158 static void rbd_dev_release(struct device *dev)
3160 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3162 if (rbd_dev->watch_request) {
3163 struct ceph_client *client = rbd_dev->rbd_client->client;
3165 ceph_osdc_unregister_linger_request(&client->osdc,
3166 rbd_dev->watch_request);
3168 if (rbd_dev->watch_event)
3169 rbd_req_sync_unwatch(rbd_dev);
3171 rbd_put_client(rbd_dev);
3173 /* clean up and free blkdev */
3174 rbd_free_disk(rbd_dev);
3175 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3177 /* release allocated disk header fields */
3178 rbd_header_free(&rbd_dev->header);
3180 /* done with the id, and with the rbd_dev */
3181 kfree(rbd_dev->mapping.snap_name);
3182 kfree(rbd_dev->image_id);
3183 kfree(rbd_dev->header_name);
3184 kfree(rbd_dev->pool_name);
3185 kfree(rbd_dev->image_name);
3186 rbd_dev_id_put(rbd_dev);
3189 /* release module ref */
3190 module_put(THIS_MODULE);
3193 static ssize_t rbd_remove(struct bus_type *bus,
3197 struct rbd_device *rbd_dev = NULL;
3202 rc = strict_strtoul(buf, 10, &ul);
3206 /* convert to int; abort if we lost anything in the conversion */
3207 target_id = (int) ul;
3208 if (target_id != ul)
3211 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3213 rbd_dev = __rbd_get_dev(target_id);
3219 __rbd_remove_all_snaps(rbd_dev);
3220 rbd_bus_del_dev(rbd_dev);
3223 mutex_unlock(&ctl_mutex);
3229 * create control files in sysfs
3232 static int rbd_sysfs_init(void)
3236 ret = device_register(&rbd_root_dev);
3240 ret = bus_register(&rbd_bus_type);
3242 device_unregister(&rbd_root_dev);
3247 static void rbd_sysfs_cleanup(void)
3249 bus_unregister(&rbd_bus_type);
3250 device_unregister(&rbd_root_dev);
3253 int __init rbd_init(void)
3257 rc = rbd_sysfs_init();
3260 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3264 void __exit rbd_exit(void)
3266 rbd_sysfs_cleanup();
3269 module_init(rbd_init);
3270 module_exit(rbd_exit);
3272 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3273 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3274 MODULE_DESCRIPTION("rados block device");
3276 /* following authorship retained from original osdblk.c */
3277 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3279 MODULE_LICENSE("GPL");