2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
49 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN 64
51 #define RBD_MAX_SNAP_NAME_LEN 32
52 #define RBD_MAX_OPT_LEN 1024
54 #define RBD_SNAP_HEAD_NAME "-"
56 #define DEV_NAME_LEN 32
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
61 * block device image metadata (in-memory version)
63 struct rbd_image_header {
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
86 * an instance of the client. multiple devices may share a client.
89 struct ceph_client *client;
90 struct rbd_options *rbd_opts;
92 struct list_head node;
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
106 struct rbd_req_coll *coll;
109 struct rbd_req_status {
116 * a collection of requests
118 struct rbd_req_coll {
122 struct rbd_req_status status[0];
129 struct list_head node;
137 int id; /* blkdev unique id */
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
143 struct ceph_client *client;
144 struct rbd_client *rbd_client;
146 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
148 spinlock_t lock; /* queue lock */
150 struct rbd_image_header header;
151 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
153 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154 char pool_name[RBD_MAX_POOL_NAME_LEN];
157 struct ceph_osd_event *watch_event;
158 struct ceph_osd_request *watch_request;
160 char snap_name[RBD_MAX_SNAP_NAME_LEN];
161 u32 cur_snap; /* index+1 of current snapshot within snap context
165 struct list_head node;
167 /* list of snapshots */
168 struct list_head snaps;
174 static struct bus_type rbd_bus_type = {
178 static spinlock_t node_lock; /* protects client get/put */
180 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list); /* devices */
182 static LIST_HEAD(rbd_client_list); /* clients */
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_rollback(struct device *dev,
187 struct device_attribute *attr,
190 static ssize_t rbd_snap_add(struct device *dev,
191 struct device_attribute *attr,
194 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
195 struct rbd_snap *snap);;
198 static struct rbd_device *dev_to_rbd(struct device *dev)
200 return container_of(dev, struct rbd_device, dev);
203 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
205 return get_device(&rbd_dev->dev);
208 static void rbd_put_dev(struct rbd_device *rbd_dev)
210 put_device(&rbd_dev->dev);
213 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
215 static int rbd_open(struct block_device *bdev, fmode_t mode)
217 struct gendisk *disk = bdev->bd_disk;
218 struct rbd_device *rbd_dev = disk->private_data;
220 rbd_get_dev(rbd_dev);
222 set_device_ro(bdev, rbd_dev->read_only);
224 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
230 static int rbd_release(struct gendisk *disk, fmode_t mode)
232 struct rbd_device *rbd_dev = disk->private_data;
234 rbd_put_dev(rbd_dev);
239 static const struct block_device_operations rbd_bd_ops = {
240 .owner = THIS_MODULE,
242 .release = rbd_release,
246 * Initialize an rbd client instance.
249 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
250 struct rbd_options *rbd_opts)
252 struct rbd_client *rbdc;
255 dout("rbd_client_create\n");
256 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
260 kref_init(&rbdc->kref);
261 INIT_LIST_HEAD(&rbdc->node);
263 rbdc->client = ceph_create_client(opt, rbdc);
264 if (IS_ERR(rbdc->client))
266 opt = NULL; /* Now rbdc->client is responsible for opt */
268 ret = ceph_open_session(rbdc->client);
272 rbdc->rbd_opts = rbd_opts;
274 spin_lock(&node_lock);
275 list_add_tail(&rbdc->node, &rbd_client_list);
276 spin_unlock(&node_lock);
278 dout("rbd_client_create created %p\n", rbdc);
282 ceph_destroy_client(rbdc->client);
287 ceph_destroy_options(opt);
292 * Find a ceph client with specific addr and configuration.
294 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
296 struct rbd_client *client_node;
298 if (opt->flags & CEPH_OPT_NOSHARE)
301 list_for_each_entry(client_node, &rbd_client_list, node)
302 if (ceph_compare_options(opt, client_node->client) == 0)
315 /* string args above */
318 static match_table_t rbdopt_tokens = {
319 {Opt_notify_timeout, "notify_timeout=%d"},
321 /* string args above */
325 static int parse_rbd_opts_token(char *c, void *private)
327 struct rbd_options *rbdopt = private;
328 substring_t argstr[MAX_OPT_ARGS];
329 int token, intval, ret;
331 token = match_token((char *)c, rbdopt_tokens, argstr);
335 if (token < Opt_last_int) {
336 ret = match_int(&argstr[0], &intval);
338 pr_err("bad mount option arg (not int) "
342 dout("got int token %d val %d\n", token, intval);
343 } else if (token > Opt_last_int && token < Opt_last_string) {
344 dout("got string token %d val %s\n", token,
347 dout("got token %d\n", token);
351 case Opt_notify_timeout:
352 rbdopt->notify_timeout = intval;
361 * Get a ceph client with specific addr and configuration, if one does
362 * not exist create it.
364 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
367 struct rbd_client *rbdc;
368 struct ceph_options *opt;
370 struct rbd_options *rbd_opts;
372 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
376 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
378 ret = ceph_parse_options(&opt, options, mon_addr,
379 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
383 spin_lock(&node_lock);
384 rbdc = __rbd_client_find(opt);
386 ceph_destroy_options(opt);
388 /* using an existing client */
389 kref_get(&rbdc->kref);
390 rbd_dev->rbd_client = rbdc;
391 rbd_dev->client = rbdc->client;
392 spin_unlock(&node_lock);
395 spin_unlock(&node_lock);
397 rbdc = rbd_client_create(opt, rbd_opts);
403 rbd_dev->rbd_client = rbdc;
404 rbd_dev->client = rbdc->client;
412 * Destroy ceph client
414 static void rbd_client_release(struct kref *kref)
416 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
418 dout("rbd_release_client %p\n", rbdc);
419 spin_lock(&node_lock);
420 list_del(&rbdc->node);
421 spin_unlock(&node_lock);
423 ceph_destroy_client(rbdc->client);
424 kfree(rbdc->rbd_opts);
429 * Drop reference to ceph client node. If it's not referenced anymore, release
432 static void rbd_put_client(struct rbd_device *rbd_dev)
434 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435 rbd_dev->rbd_client = NULL;
436 rbd_dev->client = NULL;
440 * Destroy requests collection
442 static void rbd_coll_release(struct kref *kref)
444 struct rbd_req_coll *coll =
445 container_of(kref, struct rbd_req_coll, kref);
447 dout("rbd_coll_release %p\n", coll);
452 * Create a new header structure, translate header format from the on-disk
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456 struct rbd_image_header_ondisk *ondisk,
461 u32 snap_count = le32_to_cpu(ondisk->snap_count);
464 init_rwsem(&header->snap_rwsem);
465 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
468 sizeof(struct rbd_image_snap_ondisk),
473 header->snap_names = kmalloc(header->snap_names_len,
475 if (!header->snap_names)
477 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
479 if (!header->snap_sizes)
482 header->snap_names = NULL;
483 header->snap_sizes = NULL;
485 memcpy(header->block_name, ondisk->block_name,
486 sizeof(ondisk->block_name));
488 header->image_size = le64_to_cpu(ondisk->image_size);
489 header->obj_order = ondisk->options.order;
490 header->crypt_type = ondisk->options.crypt_type;
491 header->comp_type = ondisk->options.comp_type;
493 atomic_set(&header->snapc->nref, 1);
494 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495 header->snapc->num_snaps = snap_count;
496 header->total_snaps = snap_count;
499 allocated_snaps == snap_count) {
500 for (i = 0; i < snap_count; i++) {
501 header->snapc->snaps[i] =
502 le64_to_cpu(ondisk->snaps[i].id);
503 header->snap_sizes[i] =
504 le64_to_cpu(ondisk->snaps[i].image_size);
507 /* copy snapshot names */
508 memcpy(header->snap_names, &ondisk->snaps[i],
509 header->snap_names_len);
515 kfree(header->snap_names);
517 kfree(header->snapc);
521 static int snap_index(struct rbd_image_header *header, int snap_num)
523 return header->total_snaps - snap_num;
526 static u64 cur_snap_id(struct rbd_device *rbd_dev)
528 struct rbd_image_header *header = &rbd_dev->header;
530 if (!rbd_dev->cur_snap)
533 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
540 char *p = header->snap_names;
542 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543 if (strcmp(snap_name, p) == 0)
546 if (i == header->total_snaps)
549 *seq = header->snapc->snaps[i];
552 *size = header->snap_sizes[i];
557 static int rbd_header_set_snap(struct rbd_device *dev,
558 const char *snap_name,
561 struct rbd_image_header *header = &dev->header;
562 struct ceph_snap_context *snapc = header->snapc;
565 down_write(&header->snap_rwsem);
569 strcmp(snap_name, "-") == 0 ||
570 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571 if (header->total_snaps)
572 snapc->seq = header->snap_seq;
578 *size = header->image_size;
580 ret = snap_by_name(header, snap_name, &snapc->seq, size);
584 dev->cur_snap = header->total_snaps - ret;
590 up_write(&header->snap_rwsem);
594 static void rbd_header_free(struct rbd_image_header *header)
596 kfree(header->snapc);
597 kfree(header->snap_names);
598 kfree(header->snap_sizes);
602 * get the actual striped segment name, offset and length
604 static u64 rbd_get_segment(struct rbd_image_header *header,
605 const char *block_name,
607 char *seg_name, u64 *segofs)
609 u64 seg = ofs >> header->obj_order;
612 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613 "%s.%012llx", block_name, seg);
615 ofs = ofs & ((1 << header->obj_order) - 1);
616 len = min_t(u64, len, (1 << header->obj_order) - ofs);
624 static int rbd_get_num_segments(struct rbd_image_header *header,
627 u64 start_seg = ofs >> header->obj_order;
628 u64 end_seg = (ofs + len - 1) >> header->obj_order;
629 return end_seg - start_seg + 1;
636 static void bio_chain_put(struct bio *chain)
642 chain = chain->bi_next;
648 * zeros a bio chain, starting at specific offset
650 static void zero_bio_chain(struct bio *chain, int start_ofs)
659 bio_for_each_segment(bv, chain, i) {
660 if (pos + bv->bv_len > start_ofs) {
661 int remainder = max(start_ofs - pos, 0);
662 buf = bvec_kmap_irq(bv, &flags);
663 memset(buf + remainder, 0,
664 bv->bv_len - remainder);
665 bvec_kunmap_irq(buf, &flags);
670 chain = chain->bi_next;
675 * bio_chain_clone - clone a chain of bios up to a certain length.
676 * might return a bio_pair that will need to be released.
678 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
679 struct bio_pair **bp,
680 int len, gfp_t gfpmask)
682 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
686 bio_pair_release(*bp);
690 while (old_chain && (total < len)) {
691 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
695 if (total + old_chain->bi_size > len) {
699 * this split can only happen with a single paged bio,
700 * split_bio will BUG_ON if this is not the case
702 dout("bio_chain_clone split! total=%d remaining=%d"
704 (int)total, (int)len-total,
705 (int)old_chain->bi_size);
707 /* split the bio. We'll release it either in the next
708 call, or it will have to be released outside */
709 bp = bio_split(old_chain, (len - total) / 512ULL);
713 __bio_clone(tmp, &bp->bio1);
717 __bio_clone(tmp, old_chain);
718 *next = old_chain->bi_next;
722 gfpmask &= ~__GFP_WAIT;
726 new_chain = tail = tmp;
731 old_chain = old_chain->bi_next;
733 total += tmp->bi_size;
739 tail->bi_next = NULL;
746 dout("bio_chain_clone with err\n");
747 bio_chain_put(new_chain);
752 * helpers for osd request op vectors.
754 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
759 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
763 (*ops)[0].op = opcode;
765 * op extent offset and length will be set later on
766 * in calc_raw_layout()
768 (*ops)[0].payload_len = payload_len;
772 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
777 static void rbd_coll_end_req_index(struct request *rq,
778 struct rbd_req_coll *coll,
782 struct request_queue *q;
785 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786 coll, index, ret, len);
792 blk_end_request(rq, ret, len);
798 spin_lock_irq(q->queue_lock);
799 coll->status[index].done = 1;
800 coll->status[index].rc = ret;
801 coll->status[index].bytes = len;
802 max = min = coll->num_done;
803 while (max < coll->total && coll->status[max].done)
806 for (i = min; i<max; i++) {
807 __blk_end_request(rq, coll->status[i].rc,
808 coll->status[i].bytes);
810 kref_put(&coll->kref, rbd_coll_release);
812 spin_unlock_irq(q->queue_lock);
815 static void rbd_coll_end_req(struct rbd_request *req,
818 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
822 * Send ceph osd request
824 static int rbd_do_request(struct request *rq,
825 struct rbd_device *dev,
826 struct ceph_snap_context *snapc,
828 const char *obj, u64 ofs, u64 len,
833 struct ceph_osd_req_op *ops,
835 struct rbd_req_coll *coll,
837 void (*rbd_cb)(struct ceph_osd_request *req,
838 struct ceph_msg *msg),
839 struct ceph_osd_request **linger_req,
842 struct ceph_osd_request *req;
843 struct ceph_file_layout *layout;
846 struct timespec mtime = CURRENT_TIME;
847 struct rbd_request *req_data;
848 struct ceph_osd_request_head *reqhead;
849 struct rbd_image_header *header = &dev->header;
851 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
854 rbd_coll_end_req_index(rq, coll, coll_index,
860 req_data->coll = coll;
861 req_data->coll_index = coll_index;
864 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
866 down_read(&header->snap_rwsem);
868 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
872 GFP_NOIO, pages, bio);
874 up_read(&header->snap_rwsem);
879 req->r_callback = rbd_cb;
883 req_data->pages = pages;
886 req->r_priv = req_data;
888 reqhead = req->r_request->front.iov_base;
889 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
891 strncpy(req->r_oid, obj, sizeof(req->r_oid));
892 req->r_oid_len = strlen(req->r_oid);
894 layout = &req->r_file_layout;
895 memset(layout, 0, sizeof(*layout));
896 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
897 layout->fl_stripe_count = cpu_to_le32(1);
898 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
899 layout->fl_pg_preferred = cpu_to_le32(-1);
900 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
901 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
902 ofs, &len, &bno, req, ops);
904 ceph_osdc_build_request(req, ofs, &len,
908 req->r_oid, req->r_oid_len);
909 up_read(&header->snap_rwsem);
912 ceph_osdc_set_request_linger(&dev->client->osdc, req);
916 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
921 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
923 *ver = le64_to_cpu(req->r_reassert_version.version);
924 dout("reassert_ver=%lld\n",
925 le64_to_cpu(req->r_reassert_version.version));
926 ceph_osdc_put_request(req);
931 bio_chain_put(req_data->bio);
932 ceph_osdc_put_request(req);
934 rbd_coll_end_req(req_data, ret, len);
940 * Ceph osd op callback
942 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
944 struct rbd_request *req_data = req->r_priv;
945 struct ceph_osd_reply_head *replyhead;
946 struct ceph_osd_op *op;
952 replyhead = msg->front.iov_base;
953 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
954 op = (void *)(replyhead + 1);
955 rc = le32_to_cpu(replyhead->result);
956 bytes = le64_to_cpu(op->extent.length);
957 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
959 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
961 if (rc == -ENOENT && read_op) {
962 zero_bio_chain(req_data->bio, 0);
964 } else if (rc == 0 && read_op && bytes < req_data->len) {
965 zero_bio_chain(req_data->bio, bytes);
966 bytes = req_data->len;
969 rbd_coll_end_req(req_data, rc, bytes);
972 bio_chain_put(req_data->bio);
974 ceph_osdc_put_request(req);
978 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
980 ceph_osdc_put_request(req);
984 * Do a synchronous ceph osd operation
986 static int rbd_req_sync_op(struct rbd_device *dev,
987 struct ceph_snap_context *snapc,
991 struct ceph_osd_req_op *orig_ops,
996 struct ceph_osd_request **linger_req,
1000 struct page **pages;
1002 struct ceph_osd_req_op *ops = orig_ops;
1005 num_pages = calc_pages_for(ofs , len);
1006 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1008 return PTR_ERR(pages);
1011 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1012 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1016 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1017 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1023 ret = rbd_do_request(NULL, dev, snapc, snapid,
1024 obj, ofs, len, NULL,
1035 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1036 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1040 rbd_destroy_ops(ops);
1042 ceph_release_page_vector(pages, num_pages);
1047 * Do an asynchronous ceph osd operation
1049 static int rbd_do_op(struct request *rq,
1050 struct rbd_device *rbd_dev ,
1051 struct ceph_snap_context *snapc,
1053 int opcode, int flags, int num_reply,
1056 struct rbd_req_coll *coll,
1063 struct ceph_osd_req_op *ops;
1066 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1070 seg_len = rbd_get_segment(&rbd_dev->header,
1071 rbd_dev->header.block_name,
1073 seg_name, &seg_ofs);
1075 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1077 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1081 /* we've taken care of segment sizes earlier when we
1082 cloned the bios. We should never have a segment
1083 truncated at this point */
1084 BUG_ON(seg_len < len);
1086 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1087 seg_name, seg_ofs, seg_len,
1094 rbd_req_cb, 0, NULL);
1096 rbd_destroy_ops(ops);
1103 * Request async osd write
1105 static int rbd_req_write(struct request *rq,
1106 struct rbd_device *rbd_dev,
1107 struct ceph_snap_context *snapc,
1110 struct rbd_req_coll *coll,
1113 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1115 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1117 ofs, len, bio, coll, coll_index);
1121 * Request async osd read
1123 static int rbd_req_read(struct request *rq,
1124 struct rbd_device *rbd_dev,
1128 struct rbd_req_coll *coll,
1131 return rbd_do_op(rq, rbd_dev, NULL,
1132 (snapid ? snapid : CEPH_NOSNAP),
1136 ofs, len, bio, coll, coll_index);
1140 * Request sync osd read
1142 static int rbd_req_sync_read(struct rbd_device *dev,
1143 struct ceph_snap_context *snapc,
1150 return rbd_req_sync_op(dev, NULL,
1151 (snapid ? snapid : CEPH_NOSNAP),
1155 1, obj, ofs, len, buf, NULL, ver);
1159 * Request sync osd watch
1161 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1166 struct ceph_osd_req_op *ops;
1167 struct page **pages = NULL;
1170 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1174 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1175 ops[0].watch.cookie = notify_id;
1176 ops[0].watch.flag = 0;
1178 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1185 rbd_simple_req_cb, 0, NULL);
1187 rbd_destroy_ops(ops);
1191 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1193 struct rbd_device *dev = (struct rbd_device *)data;
1199 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1200 notify_id, (int)opcode);
1201 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1202 rc = __rbd_update_snaps(dev);
1203 mutex_unlock(&ctl_mutex);
1205 pr_warning(DRV_NAME "%d got notification but failed to update"
1206 " snaps: %d\n", dev->major, rc);
1208 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1212 * Request sync osd watch
1214 static int rbd_req_sync_watch(struct rbd_device *dev,
1218 struct ceph_osd_req_op *ops;
1219 struct ceph_osd_client *osdc = &dev->client->osdc;
1221 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1225 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1226 (void *)dev, &dev->watch_event);
1230 ops[0].watch.ver = cpu_to_le64(ver);
1231 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1232 ops[0].watch.flag = 1;
1234 ret = rbd_req_sync_op(dev, NULL,
1237 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1240 &dev->watch_request, NULL);
1245 rbd_destroy_ops(ops);
1249 ceph_osdc_cancel_event(dev->watch_event);
1250 dev->watch_event = NULL;
1252 rbd_destroy_ops(ops);
1257 * Request sync osd unwatch
1259 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1262 struct ceph_osd_req_op *ops;
1264 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1268 ops[0].watch.ver = 0;
1269 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1270 ops[0].watch.flag = 0;
1272 ret = rbd_req_sync_op(dev, NULL,
1275 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1277 1, obj, 0, 0, NULL, NULL, NULL);
1279 rbd_destroy_ops(ops);
1280 ceph_osdc_cancel_event(dev->watch_event);
1281 dev->watch_event = NULL;
1285 struct rbd_notify_info {
1286 struct rbd_device *dev;
1289 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1291 struct rbd_device *dev = (struct rbd_device *)data;
1295 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1296 notify_id, (int)opcode);
1300 * Request sync osd notify
1302 static int rbd_req_sync_notify(struct rbd_device *dev,
1305 struct ceph_osd_req_op *ops;
1306 struct ceph_osd_client *osdc = &dev->client->osdc;
1307 struct ceph_osd_event *event;
1308 struct rbd_notify_info info;
1309 int payload_len = sizeof(u32) + sizeof(u32);
1312 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1318 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1319 (void *)&info, &event);
1323 ops[0].watch.ver = 1;
1324 ops[0].watch.flag = 1;
1325 ops[0].watch.cookie = event->cookie;
1326 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1327 ops[0].watch.timeout = 12;
1329 ret = rbd_req_sync_op(dev, NULL,
1332 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1334 1, obj, 0, 0, NULL, NULL, NULL);
1338 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1339 dout("ceph_osdc_wait_event returned %d\n", ret);
1340 rbd_destroy_ops(ops);
1344 ceph_osdc_cancel_event(event);
1346 rbd_destroy_ops(ops);
1351 * Request sync osd rollback
1353 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1357 struct ceph_osd_req_op *ops;
1358 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1362 ops[0].snap.snapid = snapid;
1364 ret = rbd_req_sync_op(dev, NULL,
1367 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1369 1, obj, 0, 0, NULL, NULL, NULL);
1371 rbd_destroy_ops(ops);
1377 * Request sync osd read
1379 static int rbd_req_sync_exec(struct rbd_device *dev,
1387 struct ceph_osd_req_op *ops;
1388 int cls_len = strlen(cls);
1389 int method_len = strlen(method);
1390 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1391 cls_len + method_len + len);
1395 ops[0].cls.class_name = cls;
1396 ops[0].cls.class_len = (__u8)cls_len;
1397 ops[0].cls.method_name = method;
1398 ops[0].cls.method_len = (__u8)method_len;
1399 ops[0].cls.argc = 0;
1400 ops[0].cls.indata = data;
1401 ops[0].cls.indata_len = len;
1403 ret = rbd_req_sync_op(dev, NULL,
1406 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1408 1, obj, 0, 0, NULL, NULL, ver);
1410 rbd_destroy_ops(ops);
1412 dout("cls_exec returned %d\n", ret);
1416 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1418 struct rbd_req_coll *coll =
1419 kzalloc(sizeof(struct rbd_req_coll) +
1420 sizeof(struct rbd_req_status) * num_reqs,
1425 coll->total = num_reqs;
1426 kref_init(&coll->kref);
1431 * block device queue callback
1433 static void rbd_rq_fn(struct request_queue *q)
1435 struct rbd_device *rbd_dev = q->queuedata;
1437 struct bio_pair *bp = NULL;
1439 rq = blk_fetch_request(q);
1443 struct bio *rq_bio, *next_bio = NULL;
1445 int size, op_size = 0;
1447 int num_segs, cur_seg = 0;
1448 struct rbd_req_coll *coll;
1450 /* peek at request from block layer */
1454 dout("fetched request\n");
1456 /* filter out block requests we don't understand */
1457 if ((rq->cmd_type != REQ_TYPE_FS)) {
1458 __blk_end_request_all(rq, 0);
1462 /* deduce our operation (read, write) */
1463 do_write = (rq_data_dir(rq) == WRITE);
1465 size = blk_rq_bytes(rq);
1466 ofs = blk_rq_pos(rq) * 512ULL;
1468 if (do_write && rbd_dev->read_only) {
1469 __blk_end_request_all(rq, -EROFS);
1473 spin_unlock_irq(q->queue_lock);
1475 dout("%s 0x%x bytes at 0x%llx\n",
1476 do_write ? "write" : "read",
1477 size, blk_rq_pos(rq) * 512ULL);
1479 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1480 coll = rbd_alloc_coll(num_segs);
1482 spin_lock_irq(q->queue_lock);
1483 __blk_end_request_all(rq, -ENOMEM);
1488 /* a bio clone to be passed down to OSD req */
1489 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1490 op_size = rbd_get_segment(&rbd_dev->header,
1491 rbd_dev->header.block_name,
1494 kref_get(&coll->kref);
1495 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1496 op_size, GFP_ATOMIC);
1498 rbd_coll_end_req_index(rq, coll, cur_seg,
1504 /* init OSD command: write or read */
1506 rbd_req_write(rq, rbd_dev,
1507 rbd_dev->header.snapc,
1512 rbd_req_read(rq, rbd_dev,
1513 cur_snap_id(rbd_dev),
1525 kref_put(&coll->kref, rbd_coll_release);
1528 bio_pair_release(bp);
1529 spin_lock_irq(q->queue_lock);
1531 rq = blk_fetch_request(q);
1536 * a queue callback. Makes sure that we don't create a bio that spans across
1537 * multiple osd objects. One exception would be with a single page bios,
1538 * which we handle later at bio_chain_clone
1540 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1541 struct bio_vec *bvec)
1543 struct rbd_device *rbd_dev = q->queuedata;
1544 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1545 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1546 unsigned int bio_sectors = bmd->bi_size >> 9;
1549 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1550 + bio_sectors)) << 9;
1552 max = 0; /* bio_add cannot handle a negative return */
1553 if (max <= bvec->bv_len && bio_sectors == 0)
1554 return bvec->bv_len;
1558 static void rbd_free_disk(struct rbd_device *rbd_dev)
1560 struct gendisk *disk = rbd_dev->disk;
1565 rbd_header_free(&rbd_dev->header);
1567 if (disk->flags & GENHD_FL_UP)
1570 blk_cleanup_queue(disk->queue);
1575 * reload the ondisk the header
1577 static int rbd_read_header(struct rbd_device *rbd_dev,
1578 struct rbd_image_header *header)
1581 struct rbd_image_header_ondisk *dh;
1583 u64 snap_names_len = 0;
1587 int len = sizeof(*dh) +
1588 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1592 dh = kmalloc(len, GFP_KERNEL);
1596 rc = rbd_req_sync_read(rbd_dev,
1598 rbd_dev->obj_md_name,
1604 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1608 if (snap_count != header->total_snaps) {
1609 snap_count = header->total_snaps;
1610 snap_names_len = header->snap_names_len;
1611 rbd_header_free(header);
1617 header->obj_version = ver;
1627 static int rbd_header_add_snap(struct rbd_device *dev,
1628 const char *snap_name,
1631 int name_len = strlen(snap_name);
1637 /* we should create a snapshot only if we're pointing at the head */
1641 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1643 dout("created snapid=%lld\n", new_snapid);
1647 data = kmalloc(name_len + 16, gfp_flags);
1652 e = data + name_len + 16;
1654 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1655 ceph_encode_64_safe(&p, e, new_snapid, bad);
1657 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1658 data, p - data, &ver);
1665 dev->header.snapc->seq = new_snapid;
1672 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1674 struct rbd_snap *snap;
1676 while (!list_empty(&rbd_dev->snaps)) {
1677 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1678 __rbd_remove_snap_dev(rbd_dev, snap);
1683 * only read the first part of the ondisk header, without the snaps info
1685 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1688 struct rbd_image_header h;
1692 ret = rbd_read_header(rbd_dev, &h);
1697 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1699 down_write(&rbd_dev->header.snap_rwsem);
1701 snap_seq = rbd_dev->header.snapc->seq;
1702 if (rbd_dev->header.total_snaps &&
1703 rbd_dev->header.snapc->snaps[0] == snap_seq)
1704 /* pointing at the head, will need to follow that
1708 kfree(rbd_dev->header.snapc);
1709 kfree(rbd_dev->header.snap_names);
1710 kfree(rbd_dev->header.snap_sizes);
1712 rbd_dev->header.total_snaps = h.total_snaps;
1713 rbd_dev->header.snapc = h.snapc;
1714 rbd_dev->header.snap_names = h.snap_names;
1715 rbd_dev->header.snap_names_len = h.snap_names_len;
1716 rbd_dev->header.snap_sizes = h.snap_sizes;
1718 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1720 rbd_dev->header.snapc->seq = snap_seq;
1722 ret = __rbd_init_snaps_header(rbd_dev);
1724 up_write(&rbd_dev->header.snap_rwsem);
1729 static int rbd_init_disk(struct rbd_device *rbd_dev)
1731 struct gendisk *disk;
1732 struct request_queue *q;
1736 /* contact OSD, request size info about the object being mapped */
1737 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1741 /* no need to lock here, as rbd_dev is not registered yet */
1742 rc = __rbd_init_snaps_header(rbd_dev);
1746 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1750 /* create gendisk info */
1752 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1756 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1758 disk->major = rbd_dev->major;
1759 disk->first_minor = 0;
1760 disk->fops = &rbd_bd_ops;
1761 disk->private_data = rbd_dev;
1765 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1768 blk_queue_merge_bvec(q, rbd_merge_bvec);
1771 q->queuedata = rbd_dev;
1773 rbd_dev->disk = disk;
1776 /* finally, announce the disk to the world */
1777 set_capacity(disk, total_size / 512ULL);
1780 pr_info("%s: added with size 0x%llx\n",
1781 disk->disk_name, (unsigned long long)total_size);
1794 static ssize_t rbd_size_show(struct device *dev,
1795 struct device_attribute *attr, char *buf)
1797 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1799 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1802 static ssize_t rbd_major_show(struct device *dev,
1803 struct device_attribute *attr, char *buf)
1805 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1807 return sprintf(buf, "%d\n", rbd_dev->major);
1810 static ssize_t rbd_client_id_show(struct device *dev,
1811 struct device_attribute *attr, char *buf)
1813 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1815 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1818 static ssize_t rbd_pool_show(struct device *dev,
1819 struct device_attribute *attr, char *buf)
1821 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1823 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1826 static ssize_t rbd_name_show(struct device *dev,
1827 struct device_attribute *attr, char *buf)
1829 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1831 return sprintf(buf, "%s\n", rbd_dev->obj);
1834 static ssize_t rbd_snap_show(struct device *dev,
1835 struct device_attribute *attr,
1838 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1840 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1843 static ssize_t rbd_image_refresh(struct device *dev,
1844 struct device_attribute *attr,
1848 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1852 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1854 rc = __rbd_update_snaps(rbd_dev);
1858 mutex_unlock(&ctl_mutex);
1862 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1863 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1864 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1865 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1866 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1867 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1868 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1869 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1870 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1872 static struct attribute *rbd_attrs[] = {
1873 &dev_attr_size.attr,
1874 &dev_attr_major.attr,
1875 &dev_attr_client_id.attr,
1876 &dev_attr_pool.attr,
1877 &dev_attr_name.attr,
1878 &dev_attr_current_snap.attr,
1879 &dev_attr_refresh.attr,
1880 &dev_attr_create_snap.attr,
1881 &dev_attr_rollback_snap.attr,
1885 static struct attribute_group rbd_attr_group = {
1889 static const struct attribute_group *rbd_attr_groups[] = {
1894 static void rbd_sysfs_dev_release(struct device *dev)
1898 static struct device_type rbd_device_type = {
1900 .groups = rbd_attr_groups,
1901 .release = rbd_sysfs_dev_release,
1909 static ssize_t rbd_snap_size_show(struct device *dev,
1910 struct device_attribute *attr,
1913 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1915 return sprintf(buf, "%lld\n", (long long)snap->size);
1918 static ssize_t rbd_snap_id_show(struct device *dev,
1919 struct device_attribute *attr,
1922 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1924 return sprintf(buf, "%lld\n", (long long)snap->id);
1927 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1928 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1930 static struct attribute *rbd_snap_attrs[] = {
1931 &dev_attr_snap_size.attr,
1932 &dev_attr_snap_id.attr,
1936 static struct attribute_group rbd_snap_attr_group = {
1937 .attrs = rbd_snap_attrs,
1940 static void rbd_snap_dev_release(struct device *dev)
1942 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1947 static const struct attribute_group *rbd_snap_attr_groups[] = {
1948 &rbd_snap_attr_group,
1952 static struct device_type rbd_snap_device_type = {
1953 .groups = rbd_snap_attr_groups,
1954 .release = rbd_snap_dev_release,
1957 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1958 struct rbd_snap *snap)
1960 list_del(&snap->node);
1961 device_unregister(&snap->dev);
1964 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1965 struct rbd_snap *snap,
1966 struct device *parent)
1968 struct device *dev = &snap->dev;
1971 dev->type = &rbd_snap_device_type;
1972 dev->parent = parent;
1973 dev->release = rbd_snap_dev_release;
1974 dev_set_name(dev, "snap_%s", snap->name);
1975 ret = device_register(dev);
1980 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1981 int i, const char *name,
1982 struct rbd_snap **snapp)
1985 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1988 snap->name = kstrdup(name, GFP_KERNEL);
1989 snap->size = rbd_dev->header.snap_sizes[i];
1990 snap->id = rbd_dev->header.snapc->snaps[i];
1991 if (device_is_registered(&rbd_dev->dev)) {
1992 ret = rbd_register_snap_dev(rbd_dev, snap,
2006 * search for the previous snap in a null delimited string list
2008 const char *rbd_prev_snap_name(const char *name, const char *start)
2010 if (name < start + 2)
2023 * compare the old list of snapshots that we have to what's in the header
2024 * and update it accordingly. Note that the header holds the snapshots
2025 * in a reverse order (from newest to oldest) and we need to go from
2026 * older to new so that we don't get a duplicate snap name when
2027 * doing the process (e.g., removed snapshot and recreated a new
2028 * one with the same name.
2030 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2032 const char *name, *first_name;
2033 int i = rbd_dev->header.total_snaps;
2034 struct rbd_snap *snap, *old_snap = NULL;
2036 struct list_head *p, *n;
2038 first_name = rbd_dev->header.snap_names;
2039 name = first_name + rbd_dev->header.snap_names_len;
2041 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2044 old_snap = list_entry(p, struct rbd_snap, node);
2047 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2049 if (!i || old_snap->id < cur_id) {
2050 /* old_snap->id was skipped, thus was removed */
2051 __rbd_remove_snap_dev(rbd_dev, old_snap);
2054 if (old_snap->id == cur_id) {
2055 /* we have this snapshot already */
2057 name = rbd_prev_snap_name(name, first_name);
2061 i--, name = rbd_prev_snap_name(name, first_name)) {
2066 cur_id = rbd_dev->header.snapc->snaps[i];
2067 /* snapshot removal? handle it above */
2068 if (cur_id >= old_snap->id)
2070 /* a new snapshot */
2071 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2075 /* note that we add it backward so using n and not p */
2076 list_add(&snap->node, n);
2080 /* we're done going over the old snap list, just add what's left */
2081 for (; i > 0; i--) {
2082 name = rbd_prev_snap_name(name, first_name);
2087 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2090 list_add(&snap->node, &rbd_dev->snaps);
2097 static void rbd_root_dev_release(struct device *dev)
2101 static struct device rbd_root_dev = {
2103 .release = rbd_root_dev_release,
2106 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2110 struct rbd_snap *snap;
2112 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2113 dev = &rbd_dev->dev;
2115 dev->bus = &rbd_bus_type;
2116 dev->type = &rbd_device_type;
2117 dev->parent = &rbd_root_dev;
2118 dev->release = rbd_dev_release;
2119 dev_set_name(dev, "%d", rbd_dev->id);
2120 ret = device_register(dev);
2124 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2125 ret = rbd_register_snap_dev(rbd_dev, snap,
2131 mutex_unlock(&ctl_mutex);
2134 mutex_unlock(&ctl_mutex);
2138 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2140 device_unregister(&rbd_dev->dev);
2143 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2148 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2149 rbd_dev->header.obj_version);
2150 if (ret == -ERANGE) {
2151 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2152 rc = __rbd_update_snaps(rbd_dev);
2153 mutex_unlock(&ctl_mutex);
2157 } while (ret == -ERANGE);
2162 static ssize_t rbd_add(struct bus_type *bus,
2166 struct ceph_osd_client *osdc;
2167 struct rbd_device *rbd_dev;
2168 ssize_t rc = -ENOMEM;
2169 int irc, new_id = 0;
2170 struct list_head *tmp;
2174 if (!try_module_get(THIS_MODULE))
2177 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2181 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2185 /* new rbd_device object */
2186 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2190 /* static rbd_device initialization */
2191 spin_lock_init(&rbd_dev->lock);
2192 INIT_LIST_HEAD(&rbd_dev->node);
2193 INIT_LIST_HEAD(&rbd_dev->snaps);
2195 /* generate unique id: find highest unique id, add one */
2196 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2198 list_for_each(tmp, &rbd_dev_list) {
2199 struct rbd_device *rbd_dev;
2201 rbd_dev = list_entry(tmp, struct rbd_device, node);
2202 if (rbd_dev->id >= new_id)
2203 new_id = rbd_dev->id + 1;
2206 rbd_dev->id = new_id;
2208 /* add to global list */
2209 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2211 /* parse add command */
2212 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2213 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2214 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2215 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2216 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2217 mon_dev_name, options, rbd_dev->pool_name,
2218 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2223 if (rbd_dev->snap_name[0] == 0)
2224 rbd_dev->snap_name[0] = '-';
2226 rbd_dev->obj_len = strlen(rbd_dev->obj);
2227 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2228 rbd_dev->obj, RBD_SUFFIX);
2230 /* initialize rest of new object */
2231 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2232 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2236 mutex_unlock(&ctl_mutex);
2239 osdc = &rbd_dev->client->osdc;
2240 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2242 goto err_out_client;
2243 rbd_dev->poolid = rc;
2245 /* register our block device */
2246 irc = register_blkdev(0, rbd_dev->name);
2249 goto err_out_client;
2251 rbd_dev->major = irc;
2253 rc = rbd_bus_add_dev(rbd_dev);
2255 goto err_out_blkdev;
2257 /* set up and announce blkdev mapping */
2258 rc = rbd_init_disk(rbd_dev);
2262 rc = rbd_init_watch_dev(rbd_dev);
2269 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2270 list_del_init(&rbd_dev->node);
2271 mutex_unlock(&ctl_mutex);
2273 /* this will also clean up rest of rbd_dev stuff */
2275 rbd_bus_del_dev(rbd_dev);
2277 kfree(mon_dev_name);
2281 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2283 rbd_put_client(rbd_dev);
2284 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2286 list_del_init(&rbd_dev->node);
2287 mutex_unlock(&ctl_mutex);
2293 kfree(mon_dev_name);
2295 dout("Error adding device %s\n", buf);
2296 module_put(THIS_MODULE);
2300 static struct rbd_device *__rbd_get_dev(unsigned long id)
2302 struct list_head *tmp;
2303 struct rbd_device *rbd_dev;
2305 list_for_each(tmp, &rbd_dev_list) {
2306 rbd_dev = list_entry(tmp, struct rbd_device, node);
2307 if (rbd_dev->id == id)
2313 static void rbd_dev_release(struct device *dev)
2315 struct rbd_device *rbd_dev =
2316 container_of(dev, struct rbd_device, dev);
2318 if (rbd_dev->watch_request)
2319 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2320 rbd_dev->watch_request);
2321 if (rbd_dev->watch_event)
2322 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2324 rbd_put_client(rbd_dev);
2326 /* clean up and free blkdev */
2327 rbd_free_disk(rbd_dev);
2328 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2331 /* release module ref */
2332 module_put(THIS_MODULE);
2335 static ssize_t rbd_remove(struct bus_type *bus,
2339 struct rbd_device *rbd_dev = NULL;
2344 rc = strict_strtoul(buf, 10, &ul);
2348 /* convert to int; abort if we lost anything in the conversion */
2349 target_id = (int) ul;
2350 if (target_id != ul)
2353 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2355 rbd_dev = __rbd_get_dev(target_id);
2361 list_del_init(&rbd_dev->node);
2363 __rbd_remove_all_snaps(rbd_dev);
2364 rbd_bus_del_dev(rbd_dev);
2367 mutex_unlock(&ctl_mutex);
2371 static ssize_t rbd_snap_add(struct device *dev,
2372 struct device_attribute *attr,
2376 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2378 char *name = kmalloc(count + 1, GFP_KERNEL);
2382 snprintf(name, count, "%s", buf);
2384 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2386 ret = rbd_header_add_snap(rbd_dev,
2391 ret = __rbd_update_snaps(rbd_dev);
2395 /* shouldn't hold ctl_mutex when notifying.. notify might
2396 trigger a watch callback that would need to get that mutex */
2397 mutex_unlock(&ctl_mutex);
2399 /* make a best effort, don't error if failed */
2400 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2407 mutex_unlock(&ctl_mutex);
2412 static ssize_t rbd_snap_rollback(struct device *dev,
2413 struct device_attribute *attr,
2417 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2421 char *seg_name = NULL;
2422 char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2427 /* parse snaps add command */
2428 snprintf(snap_name, count, "%s", buf);
2429 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2433 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2435 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2439 dout("snapid=%lld\n", snapid);
2442 while (cur_ofs < rbd_dev->header.image_size) {
2443 cur_ofs += rbd_get_segment(&rbd_dev->header,
2447 dout("seg_name=%s\n", seg_name);
2449 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2451 pr_warning("could not roll back obj %s err=%d\n",
2455 ret = __rbd_update_snaps(rbd_dev);
2462 mutex_unlock(&ctl_mutex);
2470 static struct bus_attribute rbd_bus_attrs[] = {
2471 __ATTR(add, S_IWUSR, NULL, rbd_add),
2472 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2477 * create control files in sysfs
2480 static int rbd_sysfs_init(void)
2484 rbd_bus_type.bus_attrs = rbd_bus_attrs;
2486 ret = bus_register(&rbd_bus_type);
2490 ret = device_register(&rbd_root_dev);
2495 static void rbd_sysfs_cleanup(void)
2497 device_unregister(&rbd_root_dev);
2498 bus_unregister(&rbd_bus_type);
2501 int __init rbd_init(void)
2505 rc = rbd_sysfs_init();
2508 spin_lock_init(&node_lock);
2509 pr_info("loaded " DRV_NAME_LONG "\n");
2513 void __exit rbd_exit(void)
2515 rbd_sysfs_cleanup();
2518 module_init(rbd_init);
2519 module_exit(rbd_exit);
2521 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2522 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2523 MODULE_DESCRIPTION("rados block device");
2525 /* following authorship retained from original osdblk.c */
2526 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2528 MODULE_LICENSE("GPL");