2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
97 * an instance of the client. multiple devices may share an rbd client.
100 struct ceph_client *client;
102 struct list_head node;
106 * a request completion status
108 struct rbd_req_status {
115 * a collection of requests
117 struct rbd_req_coll {
121 struct rbd_req_status status[0];
125 * a single io request
128 struct request *rq; /* blk layer request */
129 struct bio *bio; /* cloned bio */
130 struct page **pages; /* list of used pages */
133 struct rbd_req_coll *coll;
140 struct list_head node;
148 int dev_id; /* blkdev unique id */
150 int major; /* blkdev assigned major */
151 struct gendisk *disk; /* blkdev's gendisk and rq */
152 struct request_queue *q;
154 struct rbd_options rbd_opts;
155 struct rbd_client *rbd_client;
157 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159 spinlock_t lock; /* queue lock */
161 struct rbd_image_header header;
163 size_t image_name_len;
168 struct ceph_osd_event *watch_event;
169 struct ceph_osd_request *watch_request;
171 /* protects updating the header */
172 struct rw_semaphore header_rwsem;
173 /* name of the snapshot this device reads from */
175 /* id of the snapshot this device reads from */
176 u64 snap_id; /* current snapshot id */
177 /* whether the snap_id this device reads from still exists */
181 struct list_head node;
183 /* list of snapshots */
184 struct list_head snaps;
190 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
192 static LIST_HEAD(rbd_dev_list); /* devices */
193 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195 static LIST_HEAD(rbd_client_list); /* clients */
196 static DEFINE_SPINLOCK(rbd_client_list_lock);
198 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199 static void rbd_dev_release(struct device *dev);
200 static ssize_t rbd_snap_add(struct device *dev,
201 struct device_attribute *attr,
204 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 static struct bus_attribute rbd_bus_attrs[] = {
212 __ATTR(add, S_IWUSR, NULL, rbd_add),
213 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
217 static struct bus_type rbd_bus_type = {
219 .bus_attrs = rbd_bus_attrs,
222 static void rbd_root_dev_release(struct device *dev)
226 static struct device rbd_root_dev = {
228 .release = rbd_root_dev_release,
232 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 return get_device(&rbd_dev->dev);
237 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 put_device(&rbd_dev->dev);
242 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
251 rbd_get_dev(rbd_dev);
252 set_device_ro(bdev, rbd_dev->read_only);
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 struct rbd_device *rbd_dev = disk->private_data;
261 rbd_put_dev(rbd_dev);
266 static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
269 .release = rbd_release,
273 * Initialize an rbd client instance.
276 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
278 struct rbd_client *rbdc;
281 dout("rbd_client_create\n");
282 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286 kref_init(&rbdc->kref);
287 INIT_LIST_HEAD(&rbdc->node);
289 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
291 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
292 if (IS_ERR(rbdc->client))
294 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
296 ret = ceph_open_session(rbdc->client);
300 spin_lock(&rbd_client_list_lock);
301 list_add_tail(&rbdc->node, &rbd_client_list);
302 spin_unlock(&rbd_client_list_lock);
304 mutex_unlock(&ctl_mutex);
306 dout("rbd_client_create created %p\n", rbdc);
310 ceph_destroy_client(rbdc->client);
312 mutex_unlock(&ctl_mutex);
316 ceph_destroy_options(ceph_opts);
321 * Find a ceph client with specific addr and configuration. If
322 * found, bump its reference count.
324 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
326 struct rbd_client *client_node;
329 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332 spin_lock(&rbd_client_list_lock);
333 list_for_each_entry(client_node, &rbd_client_list, node) {
334 if (!ceph_compare_options(ceph_opts, client_node->client)) {
335 kref_get(&client_node->kref);
340 spin_unlock(&rbd_client_list_lock);
342 return found ? client_node : NULL;
353 /* string args above */
356 static match_table_t rbd_opts_tokens = {
357 {Opt_notify_timeout, "notify_timeout=%d"},
359 /* string args above */
363 static int parse_rbd_opts_token(char *c, void *private)
365 struct rbd_options *rbd_opts = private;
366 substring_t argstr[MAX_OPT_ARGS];
367 int token, intval, ret;
369 token = match_token(c, rbd_opts_tokens, argstr);
373 if (token < Opt_last_int) {
374 ret = match_int(&argstr[0], &intval);
376 pr_err("bad mount option arg (not int) "
380 dout("got int token %d val %d\n", token, intval);
381 } else if (token > Opt_last_int && token < Opt_last_string) {
382 dout("got string token %d val %s\n", token,
385 dout("got token %d\n", token);
389 case Opt_notify_timeout:
390 rbd_opts->notify_timeout = intval;
399 * Get a ceph client with specific addr and configuration, if one does
400 * not exist create it.
402 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
403 size_t mon_addr_len, char *options)
405 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
406 struct ceph_options *ceph_opts;
407 struct rbd_client *rbdc;
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411 ceph_opts = ceph_parse_options(options, mon_addr,
412 mon_addr + mon_addr_len,
413 parse_rbd_opts_token, rbd_opts);
414 if (IS_ERR(ceph_opts))
415 return PTR_ERR(ceph_opts);
417 rbdc = rbd_client_find(ceph_opts);
419 /* using an existing client */
420 ceph_destroy_options(ceph_opts);
422 rbdc = rbd_client_create(ceph_opts);
424 return PTR_ERR(rbdc);
426 rbd_dev->rbd_client = rbdc;
432 * Destroy ceph client
434 * Caller must hold rbd_client_list_lock.
436 static void rbd_client_release(struct kref *kref)
438 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
440 dout("rbd_release_client %p\n", rbdc);
441 spin_lock(&rbd_client_list_lock);
442 list_del(&rbdc->node);
443 spin_unlock(&rbd_client_list_lock);
445 ceph_destroy_client(rbdc->client);
450 * Drop reference to ceph client node. If it's not referenced anymore, release
453 static void rbd_put_client(struct rbd_device *rbd_dev)
455 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
456 rbd_dev->rbd_client = NULL;
460 * Destroy requests collection
462 static void rbd_coll_release(struct kref *kref)
464 struct rbd_req_coll *coll =
465 container_of(kref, struct rbd_req_coll, kref);
467 dout("rbd_coll_release %p\n", coll);
471 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
476 /* The header has to start with the magic rbd header text */
477 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
481 * The size of a snapshot header has to fit in a size_t, and
482 * that limits the number of snapshots.
484 snap_count = le32_to_cpu(ondisk->snap_count);
485 size = SIZE_MAX - sizeof (struct ceph_snap_context);
486 if (snap_count > size / sizeof (__le64))
490 * Not only that, but the size of the entire the snapshot
491 * header must also be representable in a size_t.
493 size -= snap_count * sizeof (__le64);
494 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
501 * Create a new header structure, translate header format from the on-disk
504 static int rbd_header_from_disk(struct rbd_image_header *header,
505 struct rbd_image_header_ondisk *ondisk)
512 memset(header, 0, sizeof (*header));
514 snap_count = le32_to_cpu(ondisk->snap_count);
516 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
517 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
518 if (!header->object_prefix)
520 memcpy(header->object_prefix, ondisk->object_prefix, len);
521 header->object_prefix[len] = '\0';
524 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
526 /* Save a copy of the snapshot names */
528 if (snap_names_len > (u64) SIZE_MAX)
530 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
531 if (!header->snap_names)
534 * Note that rbd_dev_v1_header_read() guarantees
535 * the ondisk buffer we're working with has
536 * snap_names_len bytes beyond the end of the
537 * snapshot id array, this memcpy() is safe.
539 memcpy(header->snap_names, &ondisk->snaps[snap_count],
542 /* Record each snapshot's size */
544 size = snap_count * sizeof (*header->snap_sizes);
545 header->snap_sizes = kmalloc(size, GFP_KERNEL);
546 if (!header->snap_sizes)
548 for (i = 0; i < snap_count; i++)
549 header->snap_sizes[i] =
550 le64_to_cpu(ondisk->snaps[i].image_size);
552 WARN_ON(ondisk->snap_names_len);
553 header->snap_names = NULL;
554 header->snap_sizes = NULL;
557 header->image_size = le64_to_cpu(ondisk->image_size);
558 header->obj_order = ondisk->options.order;
559 header->crypt_type = ondisk->options.crypt_type;
560 header->comp_type = ondisk->options.comp_type;
561 header->total_snaps = snap_count;
563 /* Allocate and fill in the snapshot context */
565 size = sizeof (struct ceph_snap_context);
566 size += snap_count * sizeof (header->snapc->snaps[0]);
567 header->snapc = kzalloc(size, GFP_KERNEL);
571 atomic_set(&header->snapc->nref, 1);
572 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
573 header->snapc->num_snaps = snap_count;
574 for (i = 0; i < snap_count; i++)
575 header->snapc->snaps[i] =
576 le64_to_cpu(ondisk->snaps[i].id);
581 kfree(header->snap_sizes);
582 header->snap_sizes = NULL;
583 kfree(header->snap_names);
584 header->snap_names = NULL;
585 kfree(header->object_prefix);
586 header->object_prefix = NULL;
591 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
595 char *p = header->snap_names;
597 for (i = 0; i < header->total_snaps; i++) {
598 if (!strcmp(snap_name, p)) {
600 /* Found it. Pass back its id and/or size */
603 *seq = header->snapc->snaps[i];
605 *size = header->snap_sizes[i];
608 p += strlen(p) + 1; /* Skip ahead to the next name */
613 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
617 down_write(&rbd_dev->header_rwsem);
619 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
620 sizeof (RBD_SNAP_HEAD_NAME))) {
621 rbd_dev->snap_id = CEPH_NOSNAP;
622 rbd_dev->snap_exists = false;
623 rbd_dev->read_only = 0;
625 *size = rbd_dev->header.image_size;
629 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
633 rbd_dev->snap_id = snap_id;
634 rbd_dev->snap_exists = true;
635 rbd_dev->read_only = 1;
640 up_write(&rbd_dev->header_rwsem);
644 static void rbd_header_free(struct rbd_image_header *header)
646 kfree(header->object_prefix);
647 header->object_prefix = NULL;
648 kfree(header->snap_sizes);
649 header->snap_sizes = NULL;
650 kfree(header->snap_names);
651 header->snap_names = NULL;
652 ceph_put_snap_context(header->snapc);
653 header->snapc = NULL;
657 * get the actual striped segment name, offset and length
659 static u64 rbd_get_segment(struct rbd_image_header *header,
660 const char *object_prefix,
662 char *seg_name, u64 *segofs)
664 u64 seg = ofs >> header->obj_order;
667 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
668 "%s.%012llx", object_prefix, seg);
670 ofs = ofs & ((1 << header->obj_order) - 1);
671 len = min_t(u64, len, (1 << header->obj_order) - ofs);
679 static int rbd_get_num_segments(struct rbd_image_header *header,
682 u64 start_seg = ofs >> header->obj_order;
683 u64 end_seg = (ofs + len - 1) >> header->obj_order;
684 return end_seg - start_seg + 1;
688 * returns the size of an object in the image
690 static u64 rbd_obj_bytes(struct rbd_image_header *header)
692 return 1 << header->obj_order;
699 static void bio_chain_put(struct bio *chain)
705 chain = chain->bi_next;
711 * zeros a bio chain, starting at specific offset
713 static void zero_bio_chain(struct bio *chain, int start_ofs)
722 bio_for_each_segment(bv, chain, i) {
723 if (pos + bv->bv_len > start_ofs) {
724 int remainder = max(start_ofs - pos, 0);
725 buf = bvec_kmap_irq(bv, &flags);
726 memset(buf + remainder, 0,
727 bv->bv_len - remainder);
728 bvec_kunmap_irq(buf, &flags);
733 chain = chain->bi_next;
738 * bio_chain_clone - clone a chain of bios up to a certain length.
739 * might return a bio_pair that will need to be released.
741 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
742 struct bio_pair **bp,
743 int len, gfp_t gfpmask)
745 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
749 bio_pair_release(*bp);
753 while (old_chain && (total < len)) {
754 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
758 if (total + old_chain->bi_size > len) {
762 * this split can only happen with a single paged bio,
763 * split_bio will BUG_ON if this is not the case
765 dout("bio_chain_clone split! total=%d remaining=%d"
767 total, len - total, old_chain->bi_size);
769 /* split the bio. We'll release it either in the next
770 call, or it will have to be released outside */
771 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
775 __bio_clone(tmp, &bp->bio1);
779 __bio_clone(tmp, old_chain);
780 *next = old_chain->bi_next;
784 gfpmask &= ~__GFP_WAIT;
788 new_chain = tail = tmp;
793 old_chain = old_chain->bi_next;
795 total += tmp->bi_size;
801 tail->bi_next = NULL;
808 dout("bio_chain_clone with err\n");
809 bio_chain_put(new_chain);
814 * helpers for osd request op vectors.
816 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
817 int opcode, u32 payload_len)
819 struct ceph_osd_req_op *ops;
821 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
828 * op extent offset and length will be set later on
829 * in calc_raw_layout()
831 ops[0].payload_len = payload_len;
836 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
841 static void rbd_coll_end_req_index(struct request *rq,
842 struct rbd_req_coll *coll,
846 struct request_queue *q;
849 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
850 coll, index, ret, (unsigned long long) len);
856 blk_end_request(rq, ret, len);
862 spin_lock_irq(q->queue_lock);
863 coll->status[index].done = 1;
864 coll->status[index].rc = ret;
865 coll->status[index].bytes = len;
866 max = min = coll->num_done;
867 while (max < coll->total && coll->status[max].done)
870 for (i = min; i<max; i++) {
871 __blk_end_request(rq, coll->status[i].rc,
872 coll->status[i].bytes);
874 kref_put(&coll->kref, rbd_coll_release);
876 spin_unlock_irq(q->queue_lock);
879 static void rbd_coll_end_req(struct rbd_request *req,
882 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
886 * Send ceph osd request
888 static int rbd_do_request(struct request *rq,
889 struct rbd_device *rbd_dev,
890 struct ceph_snap_context *snapc,
892 const char *object_name, u64 ofs, u64 len,
897 struct ceph_osd_req_op *ops,
898 struct rbd_req_coll *coll,
900 void (*rbd_cb)(struct ceph_osd_request *req,
901 struct ceph_msg *msg),
902 struct ceph_osd_request **linger_req,
905 struct ceph_osd_request *req;
906 struct ceph_file_layout *layout;
909 struct timespec mtime = CURRENT_TIME;
910 struct rbd_request *req_data;
911 struct ceph_osd_request_head *reqhead;
912 struct ceph_osd_client *osdc;
914 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
917 rbd_coll_end_req_index(rq, coll, coll_index,
923 req_data->coll = coll;
924 req_data->coll_index = coll_index;
927 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
928 (unsigned long long) ofs, (unsigned long long) len);
930 osdc = &rbd_dev->rbd_client->client->osdc;
931 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
932 false, GFP_NOIO, pages, bio);
938 req->r_callback = rbd_cb;
942 req_data->pages = pages;
945 req->r_priv = req_data;
947 reqhead = req->r_request->front.iov_base;
948 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
950 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
951 req->r_oid_len = strlen(req->r_oid);
953 layout = &req->r_file_layout;
954 memset(layout, 0, sizeof(*layout));
955 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
956 layout->fl_stripe_count = cpu_to_le32(1);
957 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
958 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
959 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
962 ceph_osdc_build_request(req, ofs, &len,
966 req->r_oid, req->r_oid_len);
969 ceph_osdc_set_request_linger(osdc, req);
973 ret = ceph_osdc_start_request(osdc, req, false);
978 ret = ceph_osdc_wait_request(osdc, req);
980 *ver = le64_to_cpu(req->r_reassert_version.version);
981 dout("reassert_ver=%llu\n",
983 le64_to_cpu(req->r_reassert_version.version));
984 ceph_osdc_put_request(req);
989 bio_chain_put(req_data->bio);
990 ceph_osdc_put_request(req);
992 rbd_coll_end_req(req_data, ret, len);
998 * Ceph osd op callback
1000 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1002 struct rbd_request *req_data = req->r_priv;
1003 struct ceph_osd_reply_head *replyhead;
1004 struct ceph_osd_op *op;
1010 replyhead = msg->front.iov_base;
1011 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1012 op = (void *)(replyhead + 1);
1013 rc = le32_to_cpu(replyhead->result);
1014 bytes = le64_to_cpu(op->extent.length);
1015 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1017 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1018 (unsigned long long) bytes, read_op, (int) rc);
1020 if (rc == -ENOENT && read_op) {
1021 zero_bio_chain(req_data->bio, 0);
1023 } else if (rc == 0 && read_op && bytes < req_data->len) {
1024 zero_bio_chain(req_data->bio, bytes);
1025 bytes = req_data->len;
1028 rbd_coll_end_req(req_data, rc, bytes);
1031 bio_chain_put(req_data->bio);
1033 ceph_osdc_put_request(req);
1037 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1039 ceph_osdc_put_request(req);
1043 * Do a synchronous ceph osd operation
1045 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1046 struct ceph_snap_context *snapc,
1049 struct ceph_osd_req_op *ops,
1050 const char *object_name,
1053 struct ceph_osd_request **linger_req,
1057 struct page **pages;
1060 BUG_ON(ops == NULL);
1062 num_pages = calc_pages_for(ofs , len);
1063 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1065 return PTR_ERR(pages);
1067 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1068 object_name, ofs, len, NULL,
1078 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1079 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1082 ceph_release_page_vector(pages, num_pages);
1087 * Do an asynchronous ceph osd operation
1089 static int rbd_do_op(struct request *rq,
1090 struct rbd_device *rbd_dev,
1091 struct ceph_snap_context *snapc,
1093 int opcode, int flags,
1096 struct rbd_req_coll *coll,
1103 struct ceph_osd_req_op *ops;
1106 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1110 seg_len = rbd_get_segment(&rbd_dev->header,
1111 rbd_dev->header.object_prefix,
1113 seg_name, &seg_ofs);
1115 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1118 ops = rbd_create_rw_ops(1, opcode, payload_len);
1122 /* we've taken care of segment sizes earlier when we
1123 cloned the bios. We should never have a segment
1124 truncated at this point */
1125 BUG_ON(seg_len < len);
1127 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1128 seg_name, seg_ofs, seg_len,
1134 rbd_req_cb, 0, NULL);
1136 rbd_destroy_ops(ops);
1143 * Request async osd write
1145 static int rbd_req_write(struct request *rq,
1146 struct rbd_device *rbd_dev,
1147 struct ceph_snap_context *snapc,
1150 struct rbd_req_coll *coll,
1153 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1155 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1156 ofs, len, bio, coll, coll_index);
1160 * Request async osd read
1162 static int rbd_req_read(struct request *rq,
1163 struct rbd_device *rbd_dev,
1167 struct rbd_req_coll *coll,
1170 return rbd_do_op(rq, rbd_dev, NULL,
1174 ofs, len, bio, coll, coll_index);
1178 * Request sync osd read
1180 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1182 const char *object_name,
1187 struct ceph_osd_req_op *ops;
1190 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1194 ret = rbd_req_sync_op(rbd_dev, NULL,
1197 ops, object_name, ofs, len, buf, NULL, ver);
1198 rbd_destroy_ops(ops);
1204 * Request sync osd watch
1206 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1210 struct ceph_osd_req_op *ops;
1213 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1217 ops[0].watch.ver = cpu_to_le64(ver);
1218 ops[0].watch.cookie = notify_id;
1219 ops[0].watch.flag = 0;
1221 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1222 rbd_dev->header_name, 0, 0, NULL,
1227 rbd_simple_req_cb, 0, NULL);
1229 rbd_destroy_ops(ops);
1233 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1235 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1242 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1243 rbd_dev->header_name, (unsigned long long) notify_id,
1244 (unsigned int) opcode);
1245 rc = rbd_refresh_header(rbd_dev, &hver);
1247 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1248 " update snaps: %d\n", rbd_dev->major, rc);
1250 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1254 * Request sync osd watch
1256 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1258 struct ceph_osd_req_op *ops;
1259 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1262 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1266 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1267 (void *)rbd_dev, &rbd_dev->watch_event);
1271 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1272 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1273 ops[0].watch.flag = 1;
1275 ret = rbd_req_sync_op(rbd_dev, NULL,
1277 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1279 rbd_dev->header_name,
1281 &rbd_dev->watch_request, NULL);
1286 rbd_destroy_ops(ops);
1290 ceph_osdc_cancel_event(rbd_dev->watch_event);
1291 rbd_dev->watch_event = NULL;
1293 rbd_destroy_ops(ops);
1298 * Request sync osd unwatch
1300 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1302 struct ceph_osd_req_op *ops;
1305 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1309 ops[0].watch.ver = 0;
1310 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1311 ops[0].watch.flag = 0;
1313 ret = rbd_req_sync_op(rbd_dev, NULL,
1315 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1317 rbd_dev->header_name,
1318 0, 0, NULL, NULL, NULL);
1321 rbd_destroy_ops(ops);
1322 ceph_osdc_cancel_event(rbd_dev->watch_event);
1323 rbd_dev->watch_event = NULL;
1327 struct rbd_notify_info {
1328 struct rbd_device *rbd_dev;
1331 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1333 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1337 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1338 rbd_dev->header_name, (unsigned long long) notify_id,
1339 (unsigned int) opcode);
1343 * Request sync osd notify
1345 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1347 struct ceph_osd_req_op *ops;
1348 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1349 struct ceph_osd_event *event;
1350 struct rbd_notify_info info;
1351 int payload_len = sizeof(u32) + sizeof(u32);
1354 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1358 info.rbd_dev = rbd_dev;
1360 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1361 (void *)&info, &event);
1365 ops[0].watch.ver = 1;
1366 ops[0].watch.flag = 1;
1367 ops[0].watch.cookie = event->cookie;
1368 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1369 ops[0].watch.timeout = 12;
1371 ret = rbd_req_sync_op(rbd_dev, NULL,
1373 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1375 rbd_dev->header_name,
1376 0, 0, NULL, NULL, NULL);
1380 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1381 dout("ceph_osdc_wait_event returned %d\n", ret);
1382 rbd_destroy_ops(ops);
1386 ceph_osdc_cancel_event(event);
1388 rbd_destroy_ops(ops);
1393 * Request sync osd read
1395 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1396 const char *object_name,
1397 const char *class_name,
1398 const char *method_name,
1403 struct ceph_osd_req_op *ops;
1404 int class_name_len = strlen(class_name);
1405 int method_name_len = strlen(method_name);
1408 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1409 class_name_len + method_name_len + len);
1413 ops[0].cls.class_name = class_name;
1414 ops[0].cls.class_len = (__u8) class_name_len;
1415 ops[0].cls.method_name = method_name;
1416 ops[0].cls.method_len = (__u8) method_name_len;
1417 ops[0].cls.argc = 0;
1418 ops[0].cls.indata = data;
1419 ops[0].cls.indata_len = len;
1421 ret = rbd_req_sync_op(rbd_dev, NULL,
1423 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1425 object_name, 0, 0, NULL, NULL, ver);
1427 rbd_destroy_ops(ops);
1429 dout("cls_exec returned %d\n", ret);
1433 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1435 struct rbd_req_coll *coll =
1436 kzalloc(sizeof(struct rbd_req_coll) +
1437 sizeof(struct rbd_req_status) * num_reqs,
1442 coll->total = num_reqs;
1443 kref_init(&coll->kref);
1448 * block device queue callback
1450 static void rbd_rq_fn(struct request_queue *q)
1452 struct rbd_device *rbd_dev = q->queuedata;
1454 struct bio_pair *bp = NULL;
1456 while ((rq = blk_fetch_request(q))) {
1458 struct bio *rq_bio, *next_bio = NULL;
1463 int num_segs, cur_seg = 0;
1464 struct rbd_req_coll *coll;
1465 struct ceph_snap_context *snapc;
1467 /* peek at request from block layer */
1471 dout("fetched request\n");
1473 /* filter out block requests we don't understand */
1474 if ((rq->cmd_type != REQ_TYPE_FS)) {
1475 __blk_end_request_all(rq, 0);
1479 /* deduce our operation (read, write) */
1480 do_write = (rq_data_dir(rq) == WRITE);
1482 size = blk_rq_bytes(rq);
1483 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1485 if (do_write && rbd_dev->read_only) {
1486 __blk_end_request_all(rq, -EROFS);
1490 spin_unlock_irq(q->queue_lock);
1492 down_read(&rbd_dev->header_rwsem);
1494 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1495 up_read(&rbd_dev->header_rwsem);
1496 dout("request for non-existent snapshot");
1497 spin_lock_irq(q->queue_lock);
1498 __blk_end_request_all(rq, -ENXIO);
1502 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1504 up_read(&rbd_dev->header_rwsem);
1506 dout("%s 0x%x bytes at 0x%llx\n",
1507 do_write ? "write" : "read",
1508 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1510 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1511 coll = rbd_alloc_coll(num_segs);
1513 spin_lock_irq(q->queue_lock);
1514 __blk_end_request_all(rq, -ENOMEM);
1515 ceph_put_snap_context(snapc);
1520 /* a bio clone to be passed down to OSD req */
1521 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1522 op_size = rbd_get_segment(&rbd_dev->header,
1523 rbd_dev->header.object_prefix,
1526 kref_get(&coll->kref);
1527 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1528 op_size, GFP_ATOMIC);
1530 rbd_coll_end_req_index(rq, coll, cur_seg,
1536 /* init OSD command: write or read */
1538 rbd_req_write(rq, rbd_dev,
1544 rbd_req_read(rq, rbd_dev,
1557 kref_put(&coll->kref, rbd_coll_release);
1560 bio_pair_release(bp);
1561 spin_lock_irq(q->queue_lock);
1563 ceph_put_snap_context(snapc);
1568 * a queue callback. Makes sure that we don't create a bio that spans across
1569 * multiple osd objects. One exception would be with a single page bios,
1570 * which we handle later at bio_chain_clone
1572 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1573 struct bio_vec *bvec)
1575 struct rbd_device *rbd_dev = q->queuedata;
1576 unsigned int chunk_sectors;
1578 unsigned int bio_sectors;
1581 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1582 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1583 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1585 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1586 + bio_sectors)) << SECTOR_SHIFT;
1588 max = 0; /* bio_add cannot handle a negative return */
1589 if (max <= bvec->bv_len && bio_sectors == 0)
1590 return bvec->bv_len;
1594 static void rbd_free_disk(struct rbd_device *rbd_dev)
1596 struct gendisk *disk = rbd_dev->disk;
1601 rbd_header_free(&rbd_dev->header);
1603 if (disk->flags & GENHD_FL_UP)
1606 blk_cleanup_queue(disk->queue);
1611 * Read the complete header for the given rbd device.
1613 * Returns a pointer to a dynamically-allocated buffer containing
1614 * the complete and validated header. Caller can pass the address
1615 * of a variable that will be filled in with the version of the
1616 * header object at the time it was read.
1618 * Returns a pointer-coded errno if a failure occurs.
1620 static struct rbd_image_header_ondisk *
1621 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1623 struct rbd_image_header_ondisk *ondisk = NULL;
1630 * The complete header will include an array of its 64-bit
1631 * snapshot ids, followed by the names of those snapshots as
1632 * a contiguous block of NUL-terminated strings. Note that
1633 * the number of snapshots could change by the time we read
1634 * it in, in which case we re-read it.
1641 size = sizeof (*ondisk);
1642 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1644 ondisk = kmalloc(size, GFP_KERNEL);
1646 return ERR_PTR(-ENOMEM);
1648 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1649 rbd_dev->header_name,
1651 (char *) ondisk, version);
1655 if (WARN_ON((size_t) ret < size)) {
1657 pr_warning("short header read for image %s"
1658 " (want %zd got %d)\n",
1659 rbd_dev->image_name, size, ret);
1662 if (!rbd_dev_ondisk_valid(ondisk)) {
1664 pr_warning("invalid header for image %s\n",
1665 rbd_dev->image_name);
1669 names_size = le64_to_cpu(ondisk->snap_names_len);
1670 want_count = snap_count;
1671 snap_count = le32_to_cpu(ondisk->snap_count);
1672 } while (snap_count != want_count);
1679 return ERR_PTR(ret);
1683 * reload the ondisk the header
1685 static int rbd_read_header(struct rbd_device *rbd_dev,
1686 struct rbd_image_header *header)
1688 struct rbd_image_header_ondisk *ondisk;
1692 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1694 return PTR_ERR(ondisk);
1695 ret = rbd_header_from_disk(header, ondisk);
1697 header->obj_version = ver;
1706 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1707 const char *snap_name,
1710 int name_len = strlen(snap_name);
1714 struct ceph_mon_client *monc;
1716 /* we should create a snapshot only if we're pointing at the head */
1717 if (rbd_dev->snap_id != CEPH_NOSNAP)
1720 monc = &rbd_dev->rbd_client->client->monc;
1721 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1722 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1726 data = kmalloc(name_len + 16, gfp_flags);
1731 e = data + name_len + 16;
1733 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1734 ceph_encode_64_safe(&p, e, new_snapid, bad);
1736 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1738 data, p - data, NULL);
1742 return ret < 0 ? ret : 0;
1747 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1749 struct rbd_snap *snap;
1750 struct rbd_snap *next;
1752 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1753 __rbd_remove_snap_dev(snap);
1757 * only read the first part of the ondisk header, without the snaps info
1759 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1762 struct rbd_image_header h;
1764 ret = rbd_read_header(rbd_dev, &h);
1768 down_write(&rbd_dev->header_rwsem);
1771 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1772 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1774 dout("setting size to %llu sectors", (unsigned long long) size);
1775 set_capacity(rbd_dev->disk, size);
1778 /* rbd_dev->header.object_prefix shouldn't change */
1779 kfree(rbd_dev->header.snap_sizes);
1780 kfree(rbd_dev->header.snap_names);
1781 /* osd requests may still refer to snapc */
1782 ceph_put_snap_context(rbd_dev->header.snapc);
1785 *hver = h.obj_version;
1786 rbd_dev->header.obj_version = h.obj_version;
1787 rbd_dev->header.image_size = h.image_size;
1788 rbd_dev->header.total_snaps = h.total_snaps;
1789 rbd_dev->header.snapc = h.snapc;
1790 rbd_dev->header.snap_names = h.snap_names;
1791 rbd_dev->header.snap_sizes = h.snap_sizes;
1792 /* Free the extra copy of the object prefix */
1793 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1794 kfree(h.object_prefix);
1796 ret = __rbd_init_snaps_header(rbd_dev);
1798 up_write(&rbd_dev->header_rwsem);
1803 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1807 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1808 ret = __rbd_refresh_header(rbd_dev, hver);
1809 mutex_unlock(&ctl_mutex);
1814 static int rbd_init_disk(struct rbd_device *rbd_dev)
1816 struct gendisk *disk;
1817 struct request_queue *q;
1822 /* contact OSD, request size info about the object being mapped */
1823 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1827 /* no need to lock here, as rbd_dev is not registered yet */
1828 rc = __rbd_init_snaps_header(rbd_dev);
1832 rc = rbd_header_set_snap(rbd_dev, &total_size);
1836 /* create gendisk info */
1838 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1842 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1844 disk->major = rbd_dev->major;
1845 disk->first_minor = 0;
1846 disk->fops = &rbd_bd_ops;
1847 disk->private_data = rbd_dev;
1851 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1855 /* We use the default size, but let's be explicit about it. */
1856 blk_queue_physical_block_size(q, SECTOR_SIZE);
1858 /* set io sizes to object size */
1859 segment_size = rbd_obj_bytes(&rbd_dev->header);
1860 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1861 blk_queue_max_segment_size(q, segment_size);
1862 blk_queue_io_min(q, segment_size);
1863 blk_queue_io_opt(q, segment_size);
1865 blk_queue_merge_bvec(q, rbd_merge_bvec);
1868 q->queuedata = rbd_dev;
1870 rbd_dev->disk = disk;
1873 /* finally, announce the disk to the world */
1874 set_capacity(disk, total_size / SECTOR_SIZE);
1877 pr_info("%s: added with size 0x%llx\n",
1878 disk->disk_name, (unsigned long long)total_size);
1891 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1893 return container_of(dev, struct rbd_device, dev);
1896 static ssize_t rbd_size_show(struct device *dev,
1897 struct device_attribute *attr, char *buf)
1899 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1902 down_read(&rbd_dev->header_rwsem);
1903 size = get_capacity(rbd_dev->disk);
1904 up_read(&rbd_dev->header_rwsem);
1906 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1909 static ssize_t rbd_major_show(struct device *dev,
1910 struct device_attribute *attr, char *buf)
1912 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1914 return sprintf(buf, "%d\n", rbd_dev->major);
1917 static ssize_t rbd_client_id_show(struct device *dev,
1918 struct device_attribute *attr, char *buf)
1920 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1922 return sprintf(buf, "client%lld\n",
1923 ceph_client_id(rbd_dev->rbd_client->client));
1926 static ssize_t rbd_pool_show(struct device *dev,
1927 struct device_attribute *attr, char *buf)
1929 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1931 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1934 static ssize_t rbd_pool_id_show(struct device *dev,
1935 struct device_attribute *attr, char *buf)
1937 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1939 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1942 static ssize_t rbd_name_show(struct device *dev,
1943 struct device_attribute *attr, char *buf)
1945 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1947 return sprintf(buf, "%s\n", rbd_dev->image_name);
1950 static ssize_t rbd_snap_show(struct device *dev,
1951 struct device_attribute *attr,
1954 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1956 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1959 static ssize_t rbd_image_refresh(struct device *dev,
1960 struct device_attribute *attr,
1964 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1967 ret = rbd_refresh_header(rbd_dev, NULL);
1969 return ret < 0 ? ret : size;
1972 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1973 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1974 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1975 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1976 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1977 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1978 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1979 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1980 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1982 static struct attribute *rbd_attrs[] = {
1983 &dev_attr_size.attr,
1984 &dev_attr_major.attr,
1985 &dev_attr_client_id.attr,
1986 &dev_attr_pool.attr,
1987 &dev_attr_pool_id.attr,
1988 &dev_attr_name.attr,
1989 &dev_attr_current_snap.attr,
1990 &dev_attr_refresh.attr,
1991 &dev_attr_create_snap.attr,
1995 static struct attribute_group rbd_attr_group = {
1999 static const struct attribute_group *rbd_attr_groups[] = {
2004 static void rbd_sysfs_dev_release(struct device *dev)
2008 static struct device_type rbd_device_type = {
2010 .groups = rbd_attr_groups,
2011 .release = rbd_sysfs_dev_release,
2019 static ssize_t rbd_snap_size_show(struct device *dev,
2020 struct device_attribute *attr,
2023 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2025 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2028 static ssize_t rbd_snap_id_show(struct device *dev,
2029 struct device_attribute *attr,
2032 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2034 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2037 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2038 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2040 static struct attribute *rbd_snap_attrs[] = {
2041 &dev_attr_snap_size.attr,
2042 &dev_attr_snap_id.attr,
2046 static struct attribute_group rbd_snap_attr_group = {
2047 .attrs = rbd_snap_attrs,
2050 static void rbd_snap_dev_release(struct device *dev)
2052 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2057 static const struct attribute_group *rbd_snap_attr_groups[] = {
2058 &rbd_snap_attr_group,
2062 static struct device_type rbd_snap_device_type = {
2063 .groups = rbd_snap_attr_groups,
2064 .release = rbd_snap_dev_release,
2067 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2069 list_del(&snap->node);
2070 device_unregister(&snap->dev);
2073 static int rbd_register_snap_dev(struct rbd_snap *snap,
2074 struct device *parent)
2076 struct device *dev = &snap->dev;
2079 dev->type = &rbd_snap_device_type;
2080 dev->parent = parent;
2081 dev->release = rbd_snap_dev_release;
2082 dev_set_name(dev, "snap_%s", snap->name);
2083 ret = device_register(dev);
2088 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2089 int i, const char *name)
2091 struct rbd_snap *snap;
2094 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2096 return ERR_PTR(-ENOMEM);
2099 snap->name = kstrdup(name, GFP_KERNEL);
2103 snap->size = rbd_dev->header.snap_sizes[i];
2104 snap->id = rbd_dev->header.snapc->snaps[i];
2105 if (device_is_registered(&rbd_dev->dev)) {
2106 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2117 return ERR_PTR(ret);
2121 * Scan the rbd device's current snapshot list and compare it to the
2122 * newly-received snapshot context. Remove any existing snapshots
2123 * not present in the new snapshot context. Add a new snapshot for
2124 * any snaphots in the snapshot context not in the current list.
2125 * And verify there are no changes to snapshots we already know
2128 * Assumes the snapshots in the snapshot context are sorted by
2129 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2130 * are also maintained in that order.)
2132 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2134 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2135 const u32 snap_count = snapc->num_snaps;
2136 char *snap_name = rbd_dev->header.snap_names;
2137 struct list_head *head = &rbd_dev->snaps;
2138 struct list_head *links = head->next;
2141 while (index < snap_count || links != head) {
2143 struct rbd_snap *snap;
2145 snap_id = index < snap_count ? snapc->snaps[index]
2147 snap = links != head ? list_entry(links, struct rbd_snap, node)
2149 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2151 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2152 struct list_head *next = links->next;
2154 /* Existing snapshot not in the new snap context */
2156 if (rbd_dev->snap_id == snap->id)
2157 rbd_dev->snap_exists = false;
2158 __rbd_remove_snap_dev(snap);
2160 /* Done with this list entry; advance */
2166 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2167 struct rbd_snap *new_snap;
2169 /* We haven't seen this snapshot before */
2171 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2173 if (IS_ERR(new_snap))
2174 return PTR_ERR(new_snap);
2176 /* New goes before existing, or at end of list */
2179 list_add_tail(&new_snap->node, &snap->node);
2181 list_add_tail(&new_snap->node, head);
2183 /* Already have this one */
2185 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2186 BUG_ON(strcmp(snap->name, snap_name));
2188 /* Done with this list entry; advance */
2190 links = links->next;
2193 /* Advance to the next entry in the snapshot context */
2196 snap_name += strlen(snap_name) + 1;
2202 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2206 struct rbd_snap *snap;
2208 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2209 dev = &rbd_dev->dev;
2211 dev->bus = &rbd_bus_type;
2212 dev->type = &rbd_device_type;
2213 dev->parent = &rbd_root_dev;
2214 dev->release = rbd_dev_release;
2215 dev_set_name(dev, "%d", rbd_dev->dev_id);
2216 ret = device_register(dev);
2220 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2221 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2226 mutex_unlock(&ctl_mutex);
2230 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2232 device_unregister(&rbd_dev->dev);
2235 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2240 ret = rbd_req_sync_watch(rbd_dev);
2241 if (ret == -ERANGE) {
2242 rc = rbd_refresh_header(rbd_dev, NULL);
2246 } while (ret == -ERANGE);
2251 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2254 * Get a unique rbd identifier for the given new rbd_dev, and add
2255 * the rbd_dev to the global list. The minimum rbd id is 1.
2257 static void rbd_id_get(struct rbd_device *rbd_dev)
2259 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2261 spin_lock(&rbd_dev_list_lock);
2262 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2263 spin_unlock(&rbd_dev_list_lock);
2267 * Remove an rbd_dev from the global list, and record that its
2268 * identifier is no longer in use.
2270 static void rbd_id_put(struct rbd_device *rbd_dev)
2272 struct list_head *tmp;
2273 int rbd_id = rbd_dev->dev_id;
2278 spin_lock(&rbd_dev_list_lock);
2279 list_del_init(&rbd_dev->node);
2282 * If the id being "put" is not the current maximum, there
2283 * is nothing special we need to do.
2285 if (rbd_id != atomic64_read(&rbd_id_max)) {
2286 spin_unlock(&rbd_dev_list_lock);
2291 * We need to update the current maximum id. Search the
2292 * list to find out what it is. We're more likely to find
2293 * the maximum at the end, so search the list backward.
2296 list_for_each_prev(tmp, &rbd_dev_list) {
2297 struct rbd_device *rbd_dev;
2299 rbd_dev = list_entry(tmp, struct rbd_device, node);
2300 if (rbd_id > max_id)
2303 spin_unlock(&rbd_dev_list_lock);
2306 * The max id could have been updated by rbd_id_get(), in
2307 * which case it now accurately reflects the new maximum.
2308 * Be careful not to overwrite the maximum value in that
2311 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2315 * Skips over white space at *buf, and updates *buf to point to the
2316 * first found non-space character (if any). Returns the length of
2317 * the token (string of non-white space characters) found. Note
2318 * that *buf must be terminated with '\0'.
2320 static inline size_t next_token(const char **buf)
2323 * These are the characters that produce nonzero for
2324 * isspace() in the "C" and "POSIX" locales.
2326 const char *spaces = " \f\n\r\t\v";
2328 *buf += strspn(*buf, spaces); /* Find start of token */
2330 return strcspn(*buf, spaces); /* Return token length */
2334 * Finds the next token in *buf, and if the provided token buffer is
2335 * big enough, copies the found token into it. The result, if
2336 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2337 * must be terminated with '\0' on entry.
2339 * Returns the length of the token found (not including the '\0').
2340 * Return value will be 0 if no token is found, and it will be >=
2341 * token_size if the token would not fit.
2343 * The *buf pointer will be updated to point beyond the end of the
2344 * found token. Note that this occurs even if the token buffer is
2345 * too small to hold it.
2347 static inline size_t copy_token(const char **buf,
2353 len = next_token(buf);
2354 if (len < token_size) {
2355 memcpy(token, *buf, len);
2356 *(token + len) = '\0';
2364 * Finds the next token in *buf, dynamically allocates a buffer big
2365 * enough to hold a copy of it, and copies the token into the new
2366 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2367 * that a duplicate buffer is created even for a zero-length token.
2369 * Returns a pointer to the newly-allocated duplicate, or a null
2370 * pointer if memory for the duplicate was not available. If
2371 * the lenp argument is a non-null pointer, the length of the token
2372 * (not including the '\0') is returned in *lenp.
2374 * If successful, the *buf pointer will be updated to point beyond
2375 * the end of the found token.
2377 * Note: uses GFP_KERNEL for allocation.
2379 static inline char *dup_token(const char **buf, size_t *lenp)
2384 len = next_token(buf);
2385 dup = kmalloc(len + 1, GFP_KERNEL);
2389 memcpy(dup, *buf, len);
2390 *(dup + len) = '\0';
2400 * This fills in the pool_name, image_name, image_name_len, snap_name,
2401 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2402 * on the list of monitor addresses and other options provided via
2405 * Note: rbd_dev is assumed to have been initially zero-filled.
2407 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2409 const char **mon_addrs,
2410 size_t *mon_addrs_size,
2412 size_t options_size)
2417 /* The first four tokens are required */
2419 len = next_token(&buf);
2422 *mon_addrs_size = len + 1;
2427 len = copy_token(&buf, options, options_size);
2428 if (!len || len >= options_size)
2432 rbd_dev->pool_name = dup_token(&buf, NULL);
2433 if (!rbd_dev->pool_name)
2436 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2437 if (!rbd_dev->image_name)
2440 /* Create the name of the header object */
2442 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2443 + sizeof (RBD_SUFFIX),
2445 if (!rbd_dev->header_name)
2447 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2450 * The snapshot name is optional. If none is is supplied,
2451 * we use the default value.
2453 rbd_dev->snap_name = dup_token(&buf, &len);
2454 if (!rbd_dev->snap_name)
2457 /* Replace the empty name with the default */
2458 kfree(rbd_dev->snap_name);
2460 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2461 if (!rbd_dev->snap_name)
2464 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2465 sizeof (RBD_SNAP_HEAD_NAME));
2471 kfree(rbd_dev->header_name);
2472 rbd_dev->header_name = NULL;
2473 kfree(rbd_dev->image_name);
2474 rbd_dev->image_name = NULL;
2475 rbd_dev->image_name_len = 0;
2476 kfree(rbd_dev->pool_name);
2477 rbd_dev->pool_name = NULL;
2482 static ssize_t rbd_add(struct bus_type *bus,
2487 struct rbd_device *rbd_dev = NULL;
2488 const char *mon_addrs = NULL;
2489 size_t mon_addrs_size = 0;
2490 struct ceph_osd_client *osdc;
2493 if (!try_module_get(THIS_MODULE))
2496 options = kmalloc(count, GFP_KERNEL);
2499 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2503 /* static rbd_device initialization */
2504 spin_lock_init(&rbd_dev->lock);
2505 INIT_LIST_HEAD(&rbd_dev->node);
2506 INIT_LIST_HEAD(&rbd_dev->snaps);
2507 init_rwsem(&rbd_dev->header_rwsem);
2509 /* generate unique id: find highest unique id, add one */
2510 rbd_id_get(rbd_dev);
2512 /* Fill in the device name, now that we have its id. */
2513 BUILD_BUG_ON(DEV_NAME_LEN
2514 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2515 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2517 /* parse add command */
2518 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2523 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2528 osdc = &rbd_dev->rbd_client->client->osdc;
2529 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2531 goto err_out_client;
2532 rbd_dev->pool_id = rc;
2534 /* register our block device */
2535 rc = register_blkdev(0, rbd_dev->name);
2537 goto err_out_client;
2538 rbd_dev->major = rc;
2540 rc = rbd_bus_add_dev(rbd_dev);
2542 goto err_out_blkdev;
2545 * At this point cleanup in the event of an error is the job
2546 * of the sysfs code (initiated by rbd_bus_del_dev()).
2548 * Set up and announce blkdev mapping.
2550 rc = rbd_init_disk(rbd_dev);
2554 rc = rbd_init_watch_dev(rbd_dev);
2561 /* this will also clean up rest of rbd_dev stuff */
2563 rbd_bus_del_dev(rbd_dev);
2568 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2570 rbd_put_client(rbd_dev);
2572 if (rbd_dev->pool_name) {
2573 kfree(rbd_dev->snap_name);
2574 kfree(rbd_dev->header_name);
2575 kfree(rbd_dev->image_name);
2576 kfree(rbd_dev->pool_name);
2578 rbd_id_put(rbd_dev);
2583 dout("Error adding device %s\n", buf);
2584 module_put(THIS_MODULE);
2586 return (ssize_t) rc;
2589 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2591 struct list_head *tmp;
2592 struct rbd_device *rbd_dev;
2594 spin_lock(&rbd_dev_list_lock);
2595 list_for_each(tmp, &rbd_dev_list) {
2596 rbd_dev = list_entry(tmp, struct rbd_device, node);
2597 if (rbd_dev->dev_id == dev_id) {
2598 spin_unlock(&rbd_dev_list_lock);
2602 spin_unlock(&rbd_dev_list_lock);
2606 static void rbd_dev_release(struct device *dev)
2608 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2610 if (rbd_dev->watch_request) {
2611 struct ceph_client *client = rbd_dev->rbd_client->client;
2613 ceph_osdc_unregister_linger_request(&client->osdc,
2614 rbd_dev->watch_request);
2616 if (rbd_dev->watch_event)
2617 rbd_req_sync_unwatch(rbd_dev);
2619 rbd_put_client(rbd_dev);
2621 /* clean up and free blkdev */
2622 rbd_free_disk(rbd_dev);
2623 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2625 /* done with the id, and with the rbd_dev */
2626 kfree(rbd_dev->snap_name);
2627 kfree(rbd_dev->header_name);
2628 kfree(rbd_dev->pool_name);
2629 kfree(rbd_dev->image_name);
2630 rbd_id_put(rbd_dev);
2633 /* release module ref */
2634 module_put(THIS_MODULE);
2637 static ssize_t rbd_remove(struct bus_type *bus,
2641 struct rbd_device *rbd_dev = NULL;
2646 rc = strict_strtoul(buf, 10, &ul);
2650 /* convert to int; abort if we lost anything in the conversion */
2651 target_id = (int) ul;
2652 if (target_id != ul)
2655 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2657 rbd_dev = __rbd_get_dev(target_id);
2663 __rbd_remove_all_snaps(rbd_dev);
2664 rbd_bus_del_dev(rbd_dev);
2667 mutex_unlock(&ctl_mutex);
2671 static ssize_t rbd_snap_add(struct device *dev,
2672 struct device_attribute *attr,
2676 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2678 char *name = kmalloc(count + 1, GFP_KERNEL);
2682 snprintf(name, count, "%s", buf);
2684 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2686 ret = rbd_header_add_snap(rbd_dev,
2691 ret = __rbd_refresh_header(rbd_dev, NULL);
2695 /* shouldn't hold ctl_mutex when notifying.. notify might
2696 trigger a watch callback that would need to get that mutex */
2697 mutex_unlock(&ctl_mutex);
2699 /* make a best effort, don't error if failed */
2700 rbd_req_sync_notify(rbd_dev);
2707 mutex_unlock(&ctl_mutex);
2713 * create control files in sysfs
2716 static int rbd_sysfs_init(void)
2720 ret = device_register(&rbd_root_dev);
2724 ret = bus_register(&rbd_bus_type);
2726 device_unregister(&rbd_root_dev);
2731 static void rbd_sysfs_cleanup(void)
2733 bus_unregister(&rbd_bus_type);
2734 device_unregister(&rbd_root_dev);
2737 int __init rbd_init(void)
2741 rc = rbd_sysfs_init();
2744 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2748 void __exit rbd_exit(void)
2750 rbd_sysfs_cleanup();
2753 module_init(rbd_init);
2754 module_exit(rbd_exit);
2756 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2757 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2758 MODULE_DESCRIPTION("rados block device");
2760 /* following authorship retained from original osdblk.c */
2761 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2763 MODULE_LICENSE("GPL");