]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
a8521338bf46b1b3ebc56bcc10ce374d57eafc15
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 /*
116  * An rbd image specification.
117  *
118  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
119  * identify an image.
120  */
121 struct rbd_spec {
122         u64             pool_id;
123         char            *pool_name;
124
125         char            *image_id;
126         size_t          image_id_len;
127         char            *image_name;
128         size_t          image_name_len;
129
130         u64             snap_id;
131         char            *snap_name;
132
133         struct kref     kref;
134 };
135
136 struct rbd_options {
137         bool    read_only;
138 };
139
140 /*
141  * an instance of the client.  multiple devices may share an rbd client.
142  */
143 struct rbd_client {
144         struct ceph_client      *client;
145         struct kref             kref;
146         struct list_head        node;
147 };
148
149 /*
150  * a request completion status
151  */
152 struct rbd_req_status {
153         int done;
154         int rc;
155         u64 bytes;
156 };
157
158 /*
159  * a collection of requests
160  */
161 struct rbd_req_coll {
162         int                     total;
163         int                     num_done;
164         struct kref             kref;
165         struct rbd_req_status   status[0];
166 };
167
168 /*
169  * a single io request
170  */
171 struct rbd_request {
172         struct request          *rq;            /* blk layer request */
173         struct bio              *bio;           /* cloned bio */
174         struct page             **pages;        /* list of used pages */
175         u64                     len;
176         int                     coll_index;
177         struct rbd_req_coll     *coll;
178 };
179
180 struct rbd_snap {
181         struct  device          dev;
182         const char              *name;
183         u64                     size;
184         struct list_head        node;
185         u64                     id;
186         u64                     features;
187 };
188
189 struct rbd_mapping {
190         u64                     size;
191         u64                     features;
192         bool                    read_only;
193 };
194
195 /*
196  * a single device
197  */
198 struct rbd_device {
199         int                     dev_id;         /* blkdev unique id */
200
201         int                     major;          /* blkdev assigned major */
202         struct gendisk          *disk;          /* blkdev's gendisk and rq */
203
204         u32                     image_format;   /* Either 1 or 2 */
205         struct rbd_client       *rbd_client;
206
207         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
208
209         spinlock_t              lock;           /* queue lock */
210
211         struct rbd_image_header header;
212         bool                    exists;
213         struct rbd_spec         *spec;
214
215         char                    *header_name;
216
217         struct ceph_osd_event   *watch_event;
218         struct ceph_osd_request *watch_request;
219
220         /* protects updating the header */
221         struct rw_semaphore     header_rwsem;
222
223         struct rbd_mapping      mapping;
224
225         struct list_head        node;
226
227         /* list of snapshots */
228         struct list_head        snaps;
229
230         /* sysfs related */
231         struct device           dev;
232 };
233
234 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
235
236 static LIST_HEAD(rbd_dev_list);    /* devices */
237 static DEFINE_SPINLOCK(rbd_dev_list_lock);
238
239 static LIST_HEAD(rbd_client_list);              /* clients */
240 static DEFINE_SPINLOCK(rbd_client_list_lock);
241
242 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
243 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
244
245 static void rbd_dev_release(struct device *dev);
246 static void rbd_remove_snap_dev(struct rbd_snap *snap);
247
248 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
249                        size_t count);
250 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
251                           size_t count);
252
253 static struct bus_attribute rbd_bus_attrs[] = {
254         __ATTR(add, S_IWUSR, NULL, rbd_add),
255         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
256         __ATTR_NULL
257 };
258
259 static struct bus_type rbd_bus_type = {
260         .name           = "rbd",
261         .bus_attrs      = rbd_bus_attrs,
262 };
263
264 static void rbd_root_dev_release(struct device *dev)
265 {
266 }
267
268 static struct device rbd_root_dev = {
269         .init_name =    "rbd",
270         .release =      rbd_root_dev_release,
271 };
272
273 #ifdef RBD_DEBUG
274 #define rbd_assert(expr)                                                \
275                 if (unlikely(!(expr))) {                                \
276                         printk(KERN_ERR "\nAssertion failure in %s() "  \
277                                                 "at line %d:\n\n"       \
278                                         "\trbd_assert(%s);\n\n",        \
279                                         __func__, __LINE__, #expr);     \
280                         BUG();                                          \
281                 }
282 #else /* !RBD_DEBUG */
283 #  define rbd_assert(expr)      ((void) 0)
284 #endif /* !RBD_DEBUG */
285
286 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
287 {
288         return get_device(&rbd_dev->dev);
289 }
290
291 static void rbd_put_dev(struct rbd_device *rbd_dev)
292 {
293         put_device(&rbd_dev->dev);
294 }
295
296 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
297 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
298
299 static int rbd_open(struct block_device *bdev, fmode_t mode)
300 {
301         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
302
303         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
304                 return -EROFS;
305
306         rbd_get_dev(rbd_dev);
307         set_device_ro(bdev, rbd_dev->mapping.read_only);
308
309         return 0;
310 }
311
312 static int rbd_release(struct gendisk *disk, fmode_t mode)
313 {
314         struct rbd_device *rbd_dev = disk->private_data;
315
316         rbd_put_dev(rbd_dev);
317
318         return 0;
319 }
320
321 static const struct block_device_operations rbd_bd_ops = {
322         .owner                  = THIS_MODULE,
323         .open                   = rbd_open,
324         .release                = rbd_release,
325 };
326
327 /*
328  * Initialize an rbd client instance.
329  * We own *ceph_opts.
330  */
331 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
332 {
333         struct rbd_client *rbdc;
334         int ret = -ENOMEM;
335
336         dout("rbd_client_create\n");
337         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
338         if (!rbdc)
339                 goto out_opt;
340
341         kref_init(&rbdc->kref);
342         INIT_LIST_HEAD(&rbdc->node);
343
344         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
345
346         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
347         if (IS_ERR(rbdc->client))
348                 goto out_mutex;
349         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
350
351         ret = ceph_open_session(rbdc->client);
352         if (ret < 0)
353                 goto out_err;
354
355         spin_lock(&rbd_client_list_lock);
356         list_add_tail(&rbdc->node, &rbd_client_list);
357         spin_unlock(&rbd_client_list_lock);
358
359         mutex_unlock(&ctl_mutex);
360
361         dout("rbd_client_create created %p\n", rbdc);
362         return rbdc;
363
364 out_err:
365         ceph_destroy_client(rbdc->client);
366 out_mutex:
367         mutex_unlock(&ctl_mutex);
368         kfree(rbdc);
369 out_opt:
370         if (ceph_opts)
371                 ceph_destroy_options(ceph_opts);
372         return ERR_PTR(ret);
373 }
374
375 /*
376  * Find a ceph client with specific addr and configuration.  If
377  * found, bump its reference count.
378  */
379 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
380 {
381         struct rbd_client *client_node;
382         bool found = false;
383
384         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
385                 return NULL;
386
387         spin_lock(&rbd_client_list_lock);
388         list_for_each_entry(client_node, &rbd_client_list, node) {
389                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
390                         kref_get(&client_node->kref);
391                         found = true;
392                         break;
393                 }
394         }
395         spin_unlock(&rbd_client_list_lock);
396
397         return found ? client_node : NULL;
398 }
399
400 /*
401  * mount options
402  */
403 enum {
404         Opt_last_int,
405         /* int args above */
406         Opt_last_string,
407         /* string args above */
408         Opt_read_only,
409         Opt_read_write,
410         /* Boolean args above */
411         Opt_last_bool,
412 };
413
414 static match_table_t rbd_opts_tokens = {
415         /* int args above */
416         /* string args above */
417         {Opt_read_only, "read_only"},
418         {Opt_read_only, "ro"},          /* Alternate spelling */
419         {Opt_read_write, "read_write"},
420         {Opt_read_write, "rw"},         /* Alternate spelling */
421         /* Boolean args above */
422         {-1, NULL}
423 };
424
425 static int parse_rbd_opts_token(char *c, void *private)
426 {
427         struct rbd_options *rbd_opts = private;
428         substring_t argstr[MAX_OPT_ARGS];
429         int token, intval, ret;
430
431         token = match_token(c, rbd_opts_tokens, argstr);
432         if (token < 0)
433                 return -EINVAL;
434
435         if (token < Opt_last_int) {
436                 ret = match_int(&argstr[0], &intval);
437                 if (ret < 0) {
438                         pr_err("bad mount option arg (not int) "
439                                "at '%s'\n", c);
440                         return ret;
441                 }
442                 dout("got int token %d val %d\n", token, intval);
443         } else if (token > Opt_last_int && token < Opt_last_string) {
444                 dout("got string token %d val %s\n", token,
445                      argstr[0].from);
446         } else if (token > Opt_last_string && token < Opt_last_bool) {
447                 dout("got Boolean token %d\n", token);
448         } else {
449                 dout("got token %d\n", token);
450         }
451
452         switch (token) {
453         case Opt_read_only:
454                 rbd_opts->read_only = true;
455                 break;
456         case Opt_read_write:
457                 rbd_opts->read_only = false;
458                 break;
459         default:
460                 rbd_assert(false);
461                 break;
462         }
463         return 0;
464 }
465
466 /*
467  * Get a ceph client with specific addr and configuration, if one does
468  * not exist create it.
469  */
470 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
471 {
472         struct rbd_client *rbdc;
473
474         rbdc = rbd_client_find(ceph_opts);
475         if (rbdc)       /* using an existing client */
476                 ceph_destroy_options(ceph_opts);
477         else
478                 rbdc = rbd_client_create(ceph_opts);
479
480         return rbdc;
481 }
482
483 /*
484  * Destroy ceph client
485  *
486  * Caller must hold rbd_client_list_lock.
487  */
488 static void rbd_client_release(struct kref *kref)
489 {
490         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
491
492         dout("rbd_release_client %p\n", rbdc);
493         spin_lock(&rbd_client_list_lock);
494         list_del(&rbdc->node);
495         spin_unlock(&rbd_client_list_lock);
496
497         ceph_destroy_client(rbdc->client);
498         kfree(rbdc);
499 }
500
501 /*
502  * Drop reference to ceph client node. If it's not referenced anymore, release
503  * it.
504  */
505 static void rbd_put_client(struct rbd_client *rbdc)
506 {
507         if (rbdc)
508                 kref_put(&rbdc->kref, rbd_client_release);
509 }
510
511 /*
512  * Destroy requests collection
513  */
514 static void rbd_coll_release(struct kref *kref)
515 {
516         struct rbd_req_coll *coll =
517                 container_of(kref, struct rbd_req_coll, kref);
518
519         dout("rbd_coll_release %p\n", coll);
520         kfree(coll);
521 }
522
523 static bool rbd_image_format_valid(u32 image_format)
524 {
525         return image_format == 1 || image_format == 2;
526 }
527
528 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
529 {
530         size_t size;
531         u32 snap_count;
532
533         /* The header has to start with the magic rbd header text */
534         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
535                 return false;
536
537         /* The bio layer requires at least sector-sized I/O */
538
539         if (ondisk->options.order < SECTOR_SHIFT)
540                 return false;
541
542         /* If we use u64 in a few spots we may be able to loosen this */
543
544         if (ondisk->options.order > 8 * sizeof (int) - 1)
545                 return false;
546
547         /*
548          * The size of a snapshot header has to fit in a size_t, and
549          * that limits the number of snapshots.
550          */
551         snap_count = le32_to_cpu(ondisk->snap_count);
552         size = SIZE_MAX - sizeof (struct ceph_snap_context);
553         if (snap_count > size / sizeof (__le64))
554                 return false;
555
556         /*
557          * Not only that, but the size of the entire the snapshot
558          * header must also be representable in a size_t.
559          */
560         size -= snap_count * sizeof (__le64);
561         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
562                 return false;
563
564         return true;
565 }
566
567 /*
568  * Create a new header structure, translate header format from the on-disk
569  * header.
570  */
571 static int rbd_header_from_disk(struct rbd_image_header *header,
572                                  struct rbd_image_header_ondisk *ondisk)
573 {
574         u32 snap_count;
575         size_t len;
576         size_t size;
577         u32 i;
578
579         memset(header, 0, sizeof (*header));
580
581         snap_count = le32_to_cpu(ondisk->snap_count);
582
583         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
584         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
585         if (!header->object_prefix)
586                 return -ENOMEM;
587         memcpy(header->object_prefix, ondisk->object_prefix, len);
588         header->object_prefix[len] = '\0';
589
590         if (snap_count) {
591                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
592
593                 /* Save a copy of the snapshot names */
594
595                 if (snap_names_len > (u64) SIZE_MAX)
596                         return -EIO;
597                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
598                 if (!header->snap_names)
599                         goto out_err;
600                 /*
601                  * Note that rbd_dev_v1_header_read() guarantees
602                  * the ondisk buffer we're working with has
603                  * snap_names_len bytes beyond the end of the
604                  * snapshot id array, this memcpy() is safe.
605                  */
606                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
607                         snap_names_len);
608
609                 /* Record each snapshot's size */
610
611                 size = snap_count * sizeof (*header->snap_sizes);
612                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
613                 if (!header->snap_sizes)
614                         goto out_err;
615                 for (i = 0; i < snap_count; i++)
616                         header->snap_sizes[i] =
617                                 le64_to_cpu(ondisk->snaps[i].image_size);
618         } else {
619                 WARN_ON(ondisk->snap_names_len);
620                 header->snap_names = NULL;
621                 header->snap_sizes = NULL;
622         }
623
624         header->features = 0;   /* No features support in v1 images */
625         header->obj_order = ondisk->options.order;
626         header->crypt_type = ondisk->options.crypt_type;
627         header->comp_type = ondisk->options.comp_type;
628
629         /* Allocate and fill in the snapshot context */
630
631         header->image_size = le64_to_cpu(ondisk->image_size);
632         size = sizeof (struct ceph_snap_context);
633         size += snap_count * sizeof (header->snapc->snaps[0]);
634         header->snapc = kzalloc(size, GFP_KERNEL);
635         if (!header->snapc)
636                 goto out_err;
637
638         atomic_set(&header->snapc->nref, 1);
639         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
640         header->snapc->num_snaps = snap_count;
641         for (i = 0; i < snap_count; i++)
642                 header->snapc->snaps[i] =
643                         le64_to_cpu(ondisk->snaps[i].id);
644
645         return 0;
646
647 out_err:
648         kfree(header->snap_sizes);
649         header->snap_sizes = NULL;
650         kfree(header->snap_names);
651         header->snap_names = NULL;
652         kfree(header->object_prefix);
653         header->object_prefix = NULL;
654
655         return -ENOMEM;
656 }
657
658 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
659 {
660
661         struct rbd_snap *snap;
662
663         list_for_each_entry(snap, &rbd_dev->snaps, node) {
664                 if (!strcmp(snap_name, snap->name)) {
665                         rbd_dev->spec->snap_id = snap->id;
666                         rbd_dev->mapping.size = snap->size;
667                         rbd_dev->mapping.features = snap->features;
668
669                         return 0;
670                 }
671         }
672
673         return -ENOENT;
674 }
675
676 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
677 {
678         int ret;
679
680         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
681                     sizeof (RBD_SNAP_HEAD_NAME))) {
682                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
683                 rbd_dev->mapping.size = rbd_dev->header.image_size;
684                 rbd_dev->mapping.features = rbd_dev->header.features;
685                 ret = 0;
686         } else {
687                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
688                 if (ret < 0)
689                         goto done;
690                 rbd_dev->mapping.read_only = true;
691         }
692         rbd_dev->exists = true;
693 done:
694         return ret;
695 }
696
697 static void rbd_header_free(struct rbd_image_header *header)
698 {
699         kfree(header->object_prefix);
700         header->object_prefix = NULL;
701         kfree(header->snap_sizes);
702         header->snap_sizes = NULL;
703         kfree(header->snap_names);
704         header->snap_names = NULL;
705         ceph_put_snap_context(header->snapc);
706         header->snapc = NULL;
707 }
708
709 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
710 {
711         char *name;
712         u64 segment;
713         int ret;
714
715         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
716         if (!name)
717                 return NULL;
718         segment = offset >> rbd_dev->header.obj_order;
719         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
720                         rbd_dev->header.object_prefix, segment);
721         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
722                 pr_err("error formatting segment name for #%llu (%d)\n",
723                         segment, ret);
724                 kfree(name);
725                 name = NULL;
726         }
727
728         return name;
729 }
730
731 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
732 {
733         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
734
735         return offset & (segment_size - 1);
736 }
737
738 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
739                                 u64 offset, u64 length)
740 {
741         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
742
743         offset &= segment_size - 1;
744
745         rbd_assert(length <= U64_MAX - offset);
746         if (offset + length > segment_size)
747                 length = segment_size - offset;
748
749         return length;
750 }
751
752 static int rbd_get_num_segments(struct rbd_image_header *header,
753                                 u64 ofs, u64 len)
754 {
755         u64 start_seg;
756         u64 end_seg;
757
758         if (!len)
759                 return 0;
760         if (len - 1 > U64_MAX - ofs)
761                 return -ERANGE;
762
763         start_seg = ofs >> header->obj_order;
764         end_seg = (ofs + len - 1) >> header->obj_order;
765
766         return end_seg - start_seg + 1;
767 }
768
769 /*
770  * returns the size of an object in the image
771  */
772 static u64 rbd_obj_bytes(struct rbd_image_header *header)
773 {
774         return 1 << header->obj_order;
775 }
776
777 /*
778  * bio helpers
779  */
780
781 static void bio_chain_put(struct bio *chain)
782 {
783         struct bio *tmp;
784
785         while (chain) {
786                 tmp = chain;
787                 chain = chain->bi_next;
788                 bio_put(tmp);
789         }
790 }
791
792 /*
793  * zeros a bio chain, starting at specific offset
794  */
795 static void zero_bio_chain(struct bio *chain, int start_ofs)
796 {
797         struct bio_vec *bv;
798         unsigned long flags;
799         void *buf;
800         int i;
801         int pos = 0;
802
803         while (chain) {
804                 bio_for_each_segment(bv, chain, i) {
805                         if (pos + bv->bv_len > start_ofs) {
806                                 int remainder = max(start_ofs - pos, 0);
807                                 buf = bvec_kmap_irq(bv, &flags);
808                                 memset(buf + remainder, 0,
809                                        bv->bv_len - remainder);
810                                 bvec_kunmap_irq(buf, &flags);
811                         }
812                         pos += bv->bv_len;
813                 }
814
815                 chain = chain->bi_next;
816         }
817 }
818
819 /*
820  * Clone a portion of a bio, starting at the given byte offset
821  * and continuing for the number of bytes indicated.
822  */
823 static struct bio *bio_clone_range(struct bio *bio_src,
824                                         unsigned int offset,
825                                         unsigned int len,
826                                         gfp_t gfpmask)
827 {
828         struct bio_vec *bv;
829         unsigned int resid;
830         unsigned short idx;
831         unsigned int voff;
832         unsigned short end_idx;
833         unsigned short vcnt;
834         struct bio *bio;
835
836         /* Handle the easy case for the caller */
837
838         if (!offset && len == bio_src->bi_size)
839                 return bio_clone(bio_src, gfpmask);
840
841         if (WARN_ON_ONCE(!len))
842                 return NULL;
843         if (WARN_ON_ONCE(len > bio_src->bi_size))
844                 return NULL;
845         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
846                 return NULL;
847
848         /* Find first affected segment... */
849
850         resid = offset;
851         __bio_for_each_segment(bv, bio_src, idx, 0) {
852                 if (resid < bv->bv_len)
853                         break;
854                 resid -= bv->bv_len;
855         }
856         voff = resid;
857
858         /* ...and the last affected segment */
859
860         resid += len;
861         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
862                 if (resid <= bv->bv_len)
863                         break;
864                 resid -= bv->bv_len;
865         }
866         vcnt = end_idx - idx + 1;
867
868         /* Build the clone */
869
870         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
871         if (!bio)
872                 return NULL;    /* ENOMEM */
873
874         bio->bi_bdev = bio_src->bi_bdev;
875         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
876         bio->bi_rw = bio_src->bi_rw;
877         bio->bi_flags |= 1 << BIO_CLONED;
878
879         /*
880          * Copy over our part of the bio_vec, then update the first
881          * and last (or only) entries.
882          */
883         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
884                         vcnt * sizeof (struct bio_vec));
885         bio->bi_io_vec[0].bv_offset += voff;
886         if (vcnt > 1) {
887                 bio->bi_io_vec[0].bv_len -= voff;
888                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
889         } else {
890                 bio->bi_io_vec[0].bv_len = len;
891         }
892
893         bio->bi_vcnt = vcnt;
894         bio->bi_size = len;
895         bio->bi_idx = 0;
896
897         return bio;
898 }
899
900 /*
901  * Clone a portion of a bio chain, starting at the given byte offset
902  * into the first bio in the source chain and continuing for the
903  * number of bytes indicated.  The result is another bio chain of
904  * exactly the given length, or a null pointer on error.
905  *
906  * The bio_src and offset parameters are both in-out.  On entry they
907  * refer to the first source bio and the offset into that bio where
908  * the start of data to be cloned is located.
909  *
910  * On return, bio_src is updated to refer to the bio in the source
911  * chain that contains first un-cloned byte, and *offset will
912  * contain the offset of that byte within that bio.
913  */
914 static struct bio *bio_chain_clone_range(struct bio **bio_src,
915                                         unsigned int *offset,
916                                         unsigned int len,
917                                         gfp_t gfpmask)
918 {
919         struct bio *bi = *bio_src;
920         unsigned int off = *offset;
921         struct bio *chain = NULL;
922         struct bio **end;
923
924         /* Build up a chain of clone bios up to the limit */
925
926         if (!bi || off >= bi->bi_size || !len)
927                 return NULL;            /* Nothing to clone */
928
929         end = &chain;
930         while (len) {
931                 unsigned int bi_size;
932                 struct bio *bio;
933
934                 if (!bi)
935                         goto out_err;   /* EINVAL; ran out of bio's */
936                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
937                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
938                 if (!bio)
939                         goto out_err;   /* ENOMEM */
940
941                 *end = bio;
942                 end = &bio->bi_next;
943
944                 off += bi_size;
945                 if (off == bi->bi_size) {
946                         bi = bi->bi_next;
947                         off = 0;
948                 }
949                 len -= bi_size;
950         }
951         *bio_src = bi;
952         *offset = off;
953
954         return chain;
955 out_err:
956         bio_chain_put(chain);
957
958         return NULL;
959 }
960
961 /*
962  * helpers for osd request op vectors.
963  */
964 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
965                                         int opcode, u32 payload_len)
966 {
967         struct ceph_osd_req_op *ops;
968
969         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
970         if (!ops)
971                 return NULL;
972
973         ops[0].op = opcode;
974
975         /*
976          * op extent offset and length will be set later on
977          * in calc_raw_layout()
978          */
979         ops[0].payload_len = payload_len;
980
981         return ops;
982 }
983
984 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
985 {
986         kfree(ops);
987 }
988
989 static void rbd_coll_end_req_index(struct request *rq,
990                                    struct rbd_req_coll *coll,
991                                    int index,
992                                    int ret, u64 len)
993 {
994         struct request_queue *q;
995         int min, max, i;
996
997         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
998              coll, index, ret, (unsigned long long) len);
999
1000         if (!rq)
1001                 return;
1002
1003         if (!coll) {
1004                 blk_end_request(rq, ret, len);
1005                 return;
1006         }
1007
1008         q = rq->q;
1009
1010         spin_lock_irq(q->queue_lock);
1011         coll->status[index].done = 1;
1012         coll->status[index].rc = ret;
1013         coll->status[index].bytes = len;
1014         max = min = coll->num_done;
1015         while (max < coll->total && coll->status[max].done)
1016                 max++;
1017
1018         for (i = min; i<max; i++) {
1019                 __blk_end_request(rq, coll->status[i].rc,
1020                                   coll->status[i].bytes);
1021                 coll->num_done++;
1022                 kref_put(&coll->kref, rbd_coll_release);
1023         }
1024         spin_unlock_irq(q->queue_lock);
1025 }
1026
1027 static void rbd_coll_end_req(struct rbd_request *req,
1028                              int ret, u64 len)
1029 {
1030         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1031 }
1032
1033 /*
1034  * Send ceph osd request
1035  */
1036 static int rbd_do_request(struct request *rq,
1037                           struct rbd_device *rbd_dev,
1038                           struct ceph_snap_context *snapc,
1039                           u64 snapid,
1040                           const char *object_name, u64 ofs, u64 len,
1041                           struct bio *bio,
1042                           struct page **pages,
1043                           int num_pages,
1044                           int flags,
1045                           struct ceph_osd_req_op *ops,
1046                           struct rbd_req_coll *coll,
1047                           int coll_index,
1048                           void (*rbd_cb)(struct ceph_osd_request *req,
1049                                          struct ceph_msg *msg),
1050                           struct ceph_osd_request **linger_req,
1051                           u64 *ver)
1052 {
1053         struct ceph_osd_request *req;
1054         struct ceph_file_layout *layout;
1055         int ret;
1056         u64 bno;
1057         struct timespec mtime = CURRENT_TIME;
1058         struct rbd_request *req_data;
1059         struct ceph_osd_request_head *reqhead;
1060         struct ceph_osd_client *osdc;
1061
1062         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1063         if (!req_data) {
1064                 if (coll)
1065                         rbd_coll_end_req_index(rq, coll, coll_index,
1066                                                -ENOMEM, len);
1067                 return -ENOMEM;
1068         }
1069
1070         if (coll) {
1071                 req_data->coll = coll;
1072                 req_data->coll_index = coll_index;
1073         }
1074
1075         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1076                 object_name, (unsigned long long) ofs,
1077                 (unsigned long long) len, coll, coll_index);
1078
1079         osdc = &rbd_dev->rbd_client->client->osdc;
1080         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1081                                         false, GFP_NOIO, pages, bio);
1082         if (!req) {
1083                 ret = -ENOMEM;
1084                 goto done_pages;
1085         }
1086
1087         req->r_callback = rbd_cb;
1088
1089         req_data->rq = rq;
1090         req_data->bio = bio;
1091         req_data->pages = pages;
1092         req_data->len = len;
1093
1094         req->r_priv = req_data;
1095
1096         reqhead = req->r_request->front.iov_base;
1097         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1098
1099         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1100         req->r_oid_len = strlen(req->r_oid);
1101
1102         layout = &req->r_file_layout;
1103         memset(layout, 0, sizeof(*layout));
1104         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1105         layout->fl_stripe_count = cpu_to_le32(1);
1106         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1107         layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1108         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1109                                    req, ops);
1110         rbd_assert(ret == 0);
1111
1112         ceph_osdc_build_request(req, ofs, &len,
1113                                 ops,
1114                                 snapc,
1115                                 &mtime,
1116                                 req->r_oid, req->r_oid_len);
1117
1118         if (linger_req) {
1119                 ceph_osdc_set_request_linger(osdc, req);
1120                 *linger_req = req;
1121         }
1122
1123         ret = ceph_osdc_start_request(osdc, req, false);
1124         if (ret < 0)
1125                 goto done_err;
1126
1127         if (!rbd_cb) {
1128                 ret = ceph_osdc_wait_request(osdc, req);
1129                 if (ver)
1130                         *ver = le64_to_cpu(req->r_reassert_version.version);
1131                 dout("reassert_ver=%llu\n",
1132                         (unsigned long long)
1133                                 le64_to_cpu(req->r_reassert_version.version));
1134                 ceph_osdc_put_request(req);
1135         }
1136         return ret;
1137
1138 done_err:
1139         bio_chain_put(req_data->bio);
1140         ceph_osdc_put_request(req);
1141 done_pages:
1142         rbd_coll_end_req(req_data, ret, len);
1143         kfree(req_data);
1144         return ret;
1145 }
1146
1147 /*
1148  * Ceph osd op callback
1149  */
1150 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1151 {
1152         struct rbd_request *req_data = req->r_priv;
1153         struct ceph_osd_reply_head *replyhead;
1154         struct ceph_osd_op *op;
1155         __s32 rc;
1156         u64 bytes;
1157         int read_op;
1158
1159         /* parse reply */
1160         replyhead = msg->front.iov_base;
1161         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1162         op = (void *)(replyhead + 1);
1163         rc = le32_to_cpu(replyhead->result);
1164         bytes = le64_to_cpu(op->extent.length);
1165         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1166
1167         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1168                 (unsigned long long) bytes, read_op, (int) rc);
1169
1170         if (rc == -ENOENT && read_op) {
1171                 zero_bio_chain(req_data->bio, 0);
1172                 rc = 0;
1173         } else if (rc == 0 && read_op && bytes < req_data->len) {
1174                 zero_bio_chain(req_data->bio, bytes);
1175                 bytes = req_data->len;
1176         }
1177
1178         rbd_coll_end_req(req_data, rc, bytes);
1179
1180         if (req_data->bio)
1181                 bio_chain_put(req_data->bio);
1182
1183         ceph_osdc_put_request(req);
1184         kfree(req_data);
1185 }
1186
1187 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1188 {
1189         ceph_osdc_put_request(req);
1190 }
1191
1192 /*
1193  * Do a synchronous ceph osd operation
1194  */
1195 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1196                            struct ceph_snap_context *snapc,
1197                            u64 snapid,
1198                            int flags,
1199                            struct ceph_osd_req_op *ops,
1200                            const char *object_name,
1201                            u64 ofs, u64 inbound_size,
1202                            char *inbound,
1203                            struct ceph_osd_request **linger_req,
1204                            u64 *ver)
1205 {
1206         int ret;
1207         struct page **pages;
1208         int num_pages;
1209
1210         rbd_assert(ops != NULL);
1211
1212         num_pages = calc_pages_for(ofs, inbound_size);
1213         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1214         if (IS_ERR(pages))
1215                 return PTR_ERR(pages);
1216
1217         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1218                           object_name, ofs, inbound_size, NULL,
1219                           pages, num_pages,
1220                           flags,
1221                           ops,
1222                           NULL, 0,
1223                           NULL,
1224                           linger_req, ver);
1225         if (ret < 0)
1226                 goto done;
1227
1228         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1229                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1230
1231 done:
1232         ceph_release_page_vector(pages, num_pages);
1233         return ret;
1234 }
1235
1236 /*
1237  * Do an asynchronous ceph osd operation
1238  */
1239 static int rbd_do_op(struct request *rq,
1240                      struct rbd_device *rbd_dev,
1241                      struct ceph_snap_context *snapc,
1242                      u64 ofs, u64 len,
1243                      struct bio *bio,
1244                      struct rbd_req_coll *coll,
1245                      int coll_index)
1246 {
1247         char *seg_name;
1248         u64 seg_ofs;
1249         u64 seg_len;
1250         int ret;
1251         struct ceph_osd_req_op *ops;
1252         u32 payload_len;
1253         int opcode;
1254         int flags;
1255         u64 snapid;
1256
1257         seg_name = rbd_segment_name(rbd_dev, ofs);
1258         if (!seg_name)
1259                 return -ENOMEM;
1260         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1261         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1262
1263         if (rq_data_dir(rq) == WRITE) {
1264                 opcode = CEPH_OSD_OP_WRITE;
1265                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1266                 snapid = CEPH_NOSNAP;
1267                 payload_len = seg_len;
1268         } else {
1269                 opcode = CEPH_OSD_OP_READ;
1270                 flags = CEPH_OSD_FLAG_READ;
1271                 snapc = NULL;
1272                 snapid = rbd_dev->spec->snap_id;
1273                 payload_len = 0;
1274         }
1275
1276         ret = -ENOMEM;
1277         ops = rbd_create_rw_ops(1, opcode, payload_len);
1278         if (!ops)
1279                 goto done;
1280
1281         /* we've taken care of segment sizes earlier when we
1282            cloned the bios. We should never have a segment
1283            truncated at this point */
1284         rbd_assert(seg_len == len);
1285
1286         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1287                              seg_name, seg_ofs, seg_len,
1288                              bio,
1289                              NULL, 0,
1290                              flags,
1291                              ops,
1292                              coll, coll_index,
1293                              rbd_req_cb, 0, NULL);
1294
1295         rbd_destroy_ops(ops);
1296 done:
1297         kfree(seg_name);
1298         return ret;
1299 }
1300
1301 /*
1302  * Request sync osd read
1303  */
1304 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1305                           u64 snapid,
1306                           const char *object_name,
1307                           u64 ofs, u64 len,
1308                           char *buf,
1309                           u64 *ver)
1310 {
1311         struct ceph_osd_req_op *ops;
1312         int ret;
1313
1314         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1315         if (!ops)
1316                 return -ENOMEM;
1317
1318         ret = rbd_req_sync_op(rbd_dev, NULL,
1319                                snapid,
1320                                CEPH_OSD_FLAG_READ,
1321                                ops, object_name, ofs, len, buf, NULL, ver);
1322         rbd_destroy_ops(ops);
1323
1324         return ret;
1325 }
1326
1327 /*
1328  * Request sync osd watch
1329  */
1330 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1331                                    u64 ver,
1332                                    u64 notify_id)
1333 {
1334         struct ceph_osd_req_op *ops;
1335         int ret;
1336
1337         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1338         if (!ops)
1339                 return -ENOMEM;
1340
1341         ops[0].watch.ver = cpu_to_le64(ver);
1342         ops[0].watch.cookie = notify_id;
1343         ops[0].watch.flag = 0;
1344
1345         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1346                           rbd_dev->header_name, 0, 0, NULL,
1347                           NULL, 0,
1348                           CEPH_OSD_FLAG_READ,
1349                           ops,
1350                           NULL, 0,
1351                           rbd_simple_req_cb, 0, NULL);
1352
1353         rbd_destroy_ops(ops);
1354         return ret;
1355 }
1356
1357 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1358 {
1359         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1360         u64 hver;
1361         int rc;
1362
1363         if (!rbd_dev)
1364                 return;
1365
1366         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1367                 rbd_dev->header_name, (unsigned long long) notify_id,
1368                 (unsigned int) opcode);
1369         rc = rbd_dev_refresh(rbd_dev, &hver);
1370         if (rc)
1371                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1372                            " update snaps: %d\n", rbd_dev->major, rc);
1373
1374         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1375 }
1376
1377 /*
1378  * Request sync osd watch
1379  */
1380 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1381 {
1382         struct ceph_osd_req_op *ops;
1383         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1384         int ret;
1385
1386         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1387         if (!ops)
1388                 return -ENOMEM;
1389
1390         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1391                                      (void *)rbd_dev, &rbd_dev->watch_event);
1392         if (ret < 0)
1393                 goto fail;
1394
1395         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1396         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1397         ops[0].watch.flag = 1;
1398
1399         ret = rbd_req_sync_op(rbd_dev, NULL,
1400                               CEPH_NOSNAP,
1401                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1402                               ops,
1403                               rbd_dev->header_name,
1404                               0, 0, NULL,
1405                               &rbd_dev->watch_request, NULL);
1406
1407         if (ret < 0)
1408                 goto fail_event;
1409
1410         rbd_destroy_ops(ops);
1411         return 0;
1412
1413 fail_event:
1414         ceph_osdc_cancel_event(rbd_dev->watch_event);
1415         rbd_dev->watch_event = NULL;
1416 fail:
1417         rbd_destroy_ops(ops);
1418         return ret;
1419 }
1420
1421 /*
1422  * Request sync osd unwatch
1423  */
1424 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1425 {
1426         struct ceph_osd_req_op *ops;
1427         int ret;
1428
1429         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1430         if (!ops)
1431                 return -ENOMEM;
1432
1433         ops[0].watch.ver = 0;
1434         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1435         ops[0].watch.flag = 0;
1436
1437         ret = rbd_req_sync_op(rbd_dev, NULL,
1438                               CEPH_NOSNAP,
1439                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1440                               ops,
1441                               rbd_dev->header_name,
1442                               0, 0, NULL, NULL, NULL);
1443
1444
1445         rbd_destroy_ops(ops);
1446         ceph_osdc_cancel_event(rbd_dev->watch_event);
1447         rbd_dev->watch_event = NULL;
1448         return ret;
1449 }
1450
1451 /*
1452  * Synchronous osd object method call
1453  */
1454 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1455                              const char *object_name,
1456                              const char *class_name,
1457                              const char *method_name,
1458                              const char *outbound,
1459                              size_t outbound_size,
1460                              char *inbound,
1461                              size_t inbound_size,
1462                              int flags,
1463                              u64 *ver)
1464 {
1465         struct ceph_osd_req_op *ops;
1466         int class_name_len = strlen(class_name);
1467         int method_name_len = strlen(method_name);
1468         int payload_size;
1469         int ret;
1470
1471         /*
1472          * Any input parameters required by the method we're calling
1473          * will be sent along with the class and method names as
1474          * part of the message payload.  That data and its size are
1475          * supplied via the indata and indata_len fields (named from
1476          * the perspective of the server side) in the OSD request
1477          * operation.
1478          */
1479         payload_size = class_name_len + method_name_len + outbound_size;
1480         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1481         if (!ops)
1482                 return -ENOMEM;
1483
1484         ops[0].cls.class_name = class_name;
1485         ops[0].cls.class_len = (__u8) class_name_len;
1486         ops[0].cls.method_name = method_name;
1487         ops[0].cls.method_len = (__u8) method_name_len;
1488         ops[0].cls.argc = 0;
1489         ops[0].cls.indata = outbound;
1490         ops[0].cls.indata_len = outbound_size;
1491
1492         ret = rbd_req_sync_op(rbd_dev, NULL,
1493                                CEPH_NOSNAP,
1494                                flags, ops,
1495                                object_name, 0, inbound_size, inbound,
1496                                NULL, ver);
1497
1498         rbd_destroy_ops(ops);
1499
1500         dout("cls_exec returned %d\n", ret);
1501         return ret;
1502 }
1503
1504 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1505 {
1506         struct rbd_req_coll *coll =
1507                         kzalloc(sizeof(struct rbd_req_coll) +
1508                                 sizeof(struct rbd_req_status) * num_reqs,
1509                                 GFP_ATOMIC);
1510
1511         if (!coll)
1512                 return NULL;
1513         coll->total = num_reqs;
1514         kref_init(&coll->kref);
1515         return coll;
1516 }
1517
1518 /*
1519  * block device queue callback
1520  */
1521 static void rbd_rq_fn(struct request_queue *q)
1522 {
1523         struct rbd_device *rbd_dev = q->queuedata;
1524         struct request *rq;
1525
1526         while ((rq = blk_fetch_request(q))) {
1527                 struct bio *bio;
1528                 bool do_write;
1529                 unsigned int size;
1530                 u64 ofs;
1531                 int num_segs, cur_seg = 0;
1532                 struct rbd_req_coll *coll;
1533                 struct ceph_snap_context *snapc;
1534                 unsigned int bio_offset;
1535
1536                 dout("fetched request\n");
1537
1538                 /* filter out block requests we don't understand */
1539                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1540                         __blk_end_request_all(rq, 0);
1541                         continue;
1542                 }
1543
1544                 /* deduce our operation (read, write) */
1545                 do_write = (rq_data_dir(rq) == WRITE);
1546                 if (do_write && rbd_dev->mapping.read_only) {
1547                         __blk_end_request_all(rq, -EROFS);
1548                         continue;
1549                 }
1550
1551                 spin_unlock_irq(q->queue_lock);
1552
1553                 down_read(&rbd_dev->header_rwsem);
1554
1555                 if (!rbd_dev->exists) {
1556                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1557                         up_read(&rbd_dev->header_rwsem);
1558                         dout("request for non-existent snapshot");
1559                         spin_lock_irq(q->queue_lock);
1560                         __blk_end_request_all(rq, -ENXIO);
1561                         continue;
1562                 }
1563
1564                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1565
1566                 up_read(&rbd_dev->header_rwsem);
1567
1568                 size = blk_rq_bytes(rq);
1569                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1570                 bio = rq->bio;
1571
1572                 dout("%s 0x%x bytes at 0x%llx\n",
1573                      do_write ? "write" : "read",
1574                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1575
1576                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1577                 if (num_segs <= 0) {
1578                         spin_lock_irq(q->queue_lock);
1579                         __blk_end_request_all(rq, num_segs);
1580                         ceph_put_snap_context(snapc);
1581                         continue;
1582                 }
1583                 coll = rbd_alloc_coll(num_segs);
1584                 if (!coll) {
1585                         spin_lock_irq(q->queue_lock);
1586                         __blk_end_request_all(rq, -ENOMEM);
1587                         ceph_put_snap_context(snapc);
1588                         continue;
1589                 }
1590
1591                 bio_offset = 0;
1592                 do {
1593                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1594                         unsigned int chain_size;
1595                         struct bio *bio_chain;
1596
1597                         BUG_ON(limit > (u64) UINT_MAX);
1598                         chain_size = (unsigned int) limit;
1599                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1600
1601                         kref_get(&coll->kref);
1602
1603                         /* Pass a cloned bio chain via an osd request */
1604
1605                         bio_chain = bio_chain_clone_range(&bio,
1606                                                 &bio_offset, chain_size,
1607                                                 GFP_ATOMIC);
1608                         if (bio_chain)
1609                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1610                                                 ofs, chain_size,
1611                                                 bio_chain, coll, cur_seg);
1612                         else
1613                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1614                                                        -ENOMEM, chain_size);
1615                         size -= chain_size;
1616                         ofs += chain_size;
1617
1618                         cur_seg++;
1619                 } while (size > 0);
1620                 kref_put(&coll->kref, rbd_coll_release);
1621
1622                 spin_lock_irq(q->queue_lock);
1623
1624                 ceph_put_snap_context(snapc);
1625         }
1626 }
1627
1628 /*
1629  * a queue callback. Makes sure that we don't create a bio that spans across
1630  * multiple osd objects. One exception would be with a single page bios,
1631  * which we handle later at bio_chain_clone_range()
1632  */
1633 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1634                           struct bio_vec *bvec)
1635 {
1636         struct rbd_device *rbd_dev = q->queuedata;
1637         sector_t sector_offset;
1638         sector_t sectors_per_obj;
1639         sector_t obj_sector_offset;
1640         int ret;
1641
1642         /*
1643          * Find how far into its rbd object the partition-relative
1644          * bio start sector is to offset relative to the enclosing
1645          * device.
1646          */
1647         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1648         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1649         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1650
1651         /*
1652          * Compute the number of bytes from that offset to the end
1653          * of the object.  Account for what's already used by the bio.
1654          */
1655         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1656         if (ret > bmd->bi_size)
1657                 ret -= bmd->bi_size;
1658         else
1659                 ret = 0;
1660
1661         /*
1662          * Don't send back more than was asked for.  And if the bio
1663          * was empty, let the whole thing through because:  "Note
1664          * that a block device *must* allow a single page to be
1665          * added to an empty bio."
1666          */
1667         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1668         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1669                 ret = (int) bvec->bv_len;
1670
1671         return ret;
1672 }
1673
1674 static void rbd_free_disk(struct rbd_device *rbd_dev)
1675 {
1676         struct gendisk *disk = rbd_dev->disk;
1677
1678         if (!disk)
1679                 return;
1680
1681         if (disk->flags & GENHD_FL_UP)
1682                 del_gendisk(disk);
1683         if (disk->queue)
1684                 blk_cleanup_queue(disk->queue);
1685         put_disk(disk);
1686 }
1687
1688 /*
1689  * Read the complete header for the given rbd device.
1690  *
1691  * Returns a pointer to a dynamically-allocated buffer containing
1692  * the complete and validated header.  Caller can pass the address
1693  * of a variable that will be filled in with the version of the
1694  * header object at the time it was read.
1695  *
1696  * Returns a pointer-coded errno if a failure occurs.
1697  */
1698 static struct rbd_image_header_ondisk *
1699 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1700 {
1701         struct rbd_image_header_ondisk *ondisk = NULL;
1702         u32 snap_count = 0;
1703         u64 names_size = 0;
1704         u32 want_count;
1705         int ret;
1706
1707         /*
1708          * The complete header will include an array of its 64-bit
1709          * snapshot ids, followed by the names of those snapshots as
1710          * a contiguous block of NUL-terminated strings.  Note that
1711          * the number of snapshots could change by the time we read
1712          * it in, in which case we re-read it.
1713          */
1714         do {
1715                 size_t size;
1716
1717                 kfree(ondisk);
1718
1719                 size = sizeof (*ondisk);
1720                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1721                 size += names_size;
1722                 ondisk = kmalloc(size, GFP_KERNEL);
1723                 if (!ondisk)
1724                         return ERR_PTR(-ENOMEM);
1725
1726                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1727                                        rbd_dev->header_name,
1728                                        0, size,
1729                                        (char *) ondisk, version);
1730
1731                 if (ret < 0)
1732                         goto out_err;
1733                 if (WARN_ON((size_t) ret < size)) {
1734                         ret = -ENXIO;
1735                         pr_warning("short header read for image %s"
1736                                         " (want %zd got %d)\n",
1737                                 rbd_dev->spec->image_name, size, ret);
1738                         goto out_err;
1739                 }
1740                 if (!rbd_dev_ondisk_valid(ondisk)) {
1741                         ret = -ENXIO;
1742                         pr_warning("invalid header for image %s\n",
1743                                 rbd_dev->spec->image_name);
1744                         goto out_err;
1745                 }
1746
1747                 names_size = le64_to_cpu(ondisk->snap_names_len);
1748                 want_count = snap_count;
1749                 snap_count = le32_to_cpu(ondisk->snap_count);
1750         } while (snap_count != want_count);
1751
1752         return ondisk;
1753
1754 out_err:
1755         kfree(ondisk);
1756
1757         return ERR_PTR(ret);
1758 }
1759
1760 /*
1761  * reload the ondisk the header
1762  */
1763 static int rbd_read_header(struct rbd_device *rbd_dev,
1764                            struct rbd_image_header *header)
1765 {
1766         struct rbd_image_header_ondisk *ondisk;
1767         u64 ver = 0;
1768         int ret;
1769
1770         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1771         if (IS_ERR(ondisk))
1772                 return PTR_ERR(ondisk);
1773         ret = rbd_header_from_disk(header, ondisk);
1774         if (ret >= 0)
1775                 header->obj_version = ver;
1776         kfree(ondisk);
1777
1778         return ret;
1779 }
1780
1781 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1782 {
1783         struct rbd_snap *snap;
1784         struct rbd_snap *next;
1785
1786         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1787                 rbd_remove_snap_dev(snap);
1788 }
1789
1790 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1791 {
1792         sector_t size;
1793
1794         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1795                 return;
1796
1797         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1798         dout("setting size to %llu sectors", (unsigned long long) size);
1799         rbd_dev->mapping.size = (u64) size;
1800         set_capacity(rbd_dev->disk, size);
1801 }
1802
1803 /*
1804  * only read the first part of the ondisk header, without the snaps info
1805  */
1806 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1807 {
1808         int ret;
1809         struct rbd_image_header h;
1810
1811         ret = rbd_read_header(rbd_dev, &h);
1812         if (ret < 0)
1813                 return ret;
1814
1815         down_write(&rbd_dev->header_rwsem);
1816
1817         /* Update image size, and check for resize of mapped image */
1818         rbd_dev->header.image_size = h.image_size;
1819         rbd_update_mapping_size(rbd_dev);
1820
1821         /* rbd_dev->header.object_prefix shouldn't change */
1822         kfree(rbd_dev->header.snap_sizes);
1823         kfree(rbd_dev->header.snap_names);
1824         /* osd requests may still refer to snapc */
1825         ceph_put_snap_context(rbd_dev->header.snapc);
1826
1827         if (hver)
1828                 *hver = h.obj_version;
1829         rbd_dev->header.obj_version = h.obj_version;
1830         rbd_dev->header.image_size = h.image_size;
1831         rbd_dev->header.snapc = h.snapc;
1832         rbd_dev->header.snap_names = h.snap_names;
1833         rbd_dev->header.snap_sizes = h.snap_sizes;
1834         /* Free the extra copy of the object prefix */
1835         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1836         kfree(h.object_prefix);
1837
1838         ret = rbd_dev_snaps_update(rbd_dev);
1839         if (!ret)
1840                 ret = rbd_dev_snaps_register(rbd_dev);
1841
1842         up_write(&rbd_dev->header_rwsem);
1843
1844         return ret;
1845 }
1846
1847 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1848 {
1849         int ret;
1850
1851         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1852         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1853         if (rbd_dev->image_format == 1)
1854                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1855         else
1856                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1857         mutex_unlock(&ctl_mutex);
1858
1859         return ret;
1860 }
1861
1862 static int rbd_init_disk(struct rbd_device *rbd_dev)
1863 {
1864         struct gendisk *disk;
1865         struct request_queue *q;
1866         u64 segment_size;
1867
1868         /* create gendisk info */
1869         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1870         if (!disk)
1871                 return -ENOMEM;
1872
1873         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1874                  rbd_dev->dev_id);
1875         disk->major = rbd_dev->major;
1876         disk->first_minor = 0;
1877         disk->fops = &rbd_bd_ops;
1878         disk->private_data = rbd_dev;
1879
1880         /* init rq */
1881         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1882         if (!q)
1883                 goto out_disk;
1884
1885         /* We use the default size, but let's be explicit about it. */
1886         blk_queue_physical_block_size(q, SECTOR_SIZE);
1887
1888         /* set io sizes to object size */
1889         segment_size = rbd_obj_bytes(&rbd_dev->header);
1890         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1891         blk_queue_max_segment_size(q, segment_size);
1892         blk_queue_io_min(q, segment_size);
1893         blk_queue_io_opt(q, segment_size);
1894
1895         blk_queue_merge_bvec(q, rbd_merge_bvec);
1896         disk->queue = q;
1897
1898         q->queuedata = rbd_dev;
1899
1900         rbd_dev->disk = disk;
1901
1902         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1903
1904         return 0;
1905 out_disk:
1906         put_disk(disk);
1907
1908         return -ENOMEM;
1909 }
1910
1911 /*
1912   sysfs
1913 */
1914
1915 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1916 {
1917         return container_of(dev, struct rbd_device, dev);
1918 }
1919
1920 static ssize_t rbd_size_show(struct device *dev,
1921                              struct device_attribute *attr, char *buf)
1922 {
1923         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924         sector_t size;
1925
1926         down_read(&rbd_dev->header_rwsem);
1927         size = get_capacity(rbd_dev->disk);
1928         up_read(&rbd_dev->header_rwsem);
1929
1930         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1931 }
1932
1933 /*
1934  * Note this shows the features for whatever's mapped, which is not
1935  * necessarily the base image.
1936  */
1937 static ssize_t rbd_features_show(struct device *dev,
1938                              struct device_attribute *attr, char *buf)
1939 {
1940         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1941
1942         return sprintf(buf, "0x%016llx\n",
1943                         (unsigned long long) rbd_dev->mapping.features);
1944 }
1945
1946 static ssize_t rbd_major_show(struct device *dev,
1947                               struct device_attribute *attr, char *buf)
1948 {
1949         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1950
1951         return sprintf(buf, "%d\n", rbd_dev->major);
1952 }
1953
1954 static ssize_t rbd_client_id_show(struct device *dev,
1955                                   struct device_attribute *attr, char *buf)
1956 {
1957         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1958
1959         return sprintf(buf, "client%lld\n",
1960                         ceph_client_id(rbd_dev->rbd_client->client));
1961 }
1962
1963 static ssize_t rbd_pool_show(struct device *dev,
1964                              struct device_attribute *attr, char *buf)
1965 {
1966         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1967
1968         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1969 }
1970
1971 static ssize_t rbd_pool_id_show(struct device *dev,
1972                              struct device_attribute *attr, char *buf)
1973 {
1974         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1975
1976         return sprintf(buf, "%llu\n",
1977                 (unsigned long long) rbd_dev->spec->pool_id);
1978 }
1979
1980 static ssize_t rbd_name_show(struct device *dev,
1981                              struct device_attribute *attr, char *buf)
1982 {
1983         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1984
1985         return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
1986 }
1987
1988 static ssize_t rbd_image_id_show(struct device *dev,
1989                              struct device_attribute *attr, char *buf)
1990 {
1991         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1992
1993         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
1994 }
1995
1996 /*
1997  * Shows the name of the currently-mapped snapshot (or
1998  * RBD_SNAP_HEAD_NAME for the base image).
1999  */
2000 static ssize_t rbd_snap_show(struct device *dev,
2001                              struct device_attribute *attr,
2002                              char *buf)
2003 {
2004         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2005
2006         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2007 }
2008
2009 static ssize_t rbd_image_refresh(struct device *dev,
2010                                  struct device_attribute *attr,
2011                                  const char *buf,
2012                                  size_t size)
2013 {
2014         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2015         int ret;
2016
2017         ret = rbd_dev_refresh(rbd_dev, NULL);
2018
2019         return ret < 0 ? ret : size;
2020 }
2021
2022 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2023 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2024 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2025 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2026 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2027 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2028 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2029 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2030 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2031 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2032
2033 static struct attribute *rbd_attrs[] = {
2034         &dev_attr_size.attr,
2035         &dev_attr_features.attr,
2036         &dev_attr_major.attr,
2037         &dev_attr_client_id.attr,
2038         &dev_attr_pool.attr,
2039         &dev_attr_pool_id.attr,
2040         &dev_attr_name.attr,
2041         &dev_attr_image_id.attr,
2042         &dev_attr_current_snap.attr,
2043         &dev_attr_refresh.attr,
2044         NULL
2045 };
2046
2047 static struct attribute_group rbd_attr_group = {
2048         .attrs = rbd_attrs,
2049 };
2050
2051 static const struct attribute_group *rbd_attr_groups[] = {
2052         &rbd_attr_group,
2053         NULL
2054 };
2055
2056 static void rbd_sysfs_dev_release(struct device *dev)
2057 {
2058 }
2059
2060 static struct device_type rbd_device_type = {
2061         .name           = "rbd",
2062         .groups         = rbd_attr_groups,
2063         .release        = rbd_sysfs_dev_release,
2064 };
2065
2066
2067 /*
2068   sysfs - snapshots
2069 */
2070
2071 static ssize_t rbd_snap_size_show(struct device *dev,
2072                                   struct device_attribute *attr,
2073                                   char *buf)
2074 {
2075         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2076
2077         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2078 }
2079
2080 static ssize_t rbd_snap_id_show(struct device *dev,
2081                                 struct device_attribute *attr,
2082                                 char *buf)
2083 {
2084         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2085
2086         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2087 }
2088
2089 static ssize_t rbd_snap_features_show(struct device *dev,
2090                                 struct device_attribute *attr,
2091                                 char *buf)
2092 {
2093         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2094
2095         return sprintf(buf, "0x%016llx\n",
2096                         (unsigned long long) snap->features);
2097 }
2098
2099 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2100 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2101 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2102
2103 static struct attribute *rbd_snap_attrs[] = {
2104         &dev_attr_snap_size.attr,
2105         &dev_attr_snap_id.attr,
2106         &dev_attr_snap_features.attr,
2107         NULL,
2108 };
2109
2110 static struct attribute_group rbd_snap_attr_group = {
2111         .attrs = rbd_snap_attrs,
2112 };
2113
2114 static void rbd_snap_dev_release(struct device *dev)
2115 {
2116         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2117         kfree(snap->name);
2118         kfree(snap);
2119 }
2120
2121 static const struct attribute_group *rbd_snap_attr_groups[] = {
2122         &rbd_snap_attr_group,
2123         NULL
2124 };
2125
2126 static struct device_type rbd_snap_device_type = {
2127         .groups         = rbd_snap_attr_groups,
2128         .release        = rbd_snap_dev_release,
2129 };
2130
2131 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2132 {
2133         kref_get(&spec->kref);
2134
2135         return spec;
2136 }
2137
2138 static void rbd_spec_free(struct kref *kref);
2139 static void rbd_spec_put(struct rbd_spec *spec)
2140 {
2141         if (spec)
2142                 kref_put(&spec->kref, rbd_spec_free);
2143 }
2144
2145 static struct rbd_spec *rbd_spec_alloc(void)
2146 {
2147         struct rbd_spec *spec;
2148
2149         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2150         if (!spec)
2151                 return NULL;
2152         kref_init(&spec->kref);
2153
2154         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2155
2156         return spec;
2157 }
2158
2159 static void rbd_spec_free(struct kref *kref)
2160 {
2161         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2162
2163         kfree(spec->pool_name);
2164         kfree(spec->image_id);
2165         kfree(spec->image_name);
2166         kfree(spec->snap_name);
2167         kfree(spec);
2168 }
2169
2170 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2171                                 struct rbd_spec *spec)
2172 {
2173         struct rbd_device *rbd_dev;
2174
2175         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2176         if (!rbd_dev)
2177                 return NULL;
2178
2179         spin_lock_init(&rbd_dev->lock);
2180         INIT_LIST_HEAD(&rbd_dev->node);
2181         INIT_LIST_HEAD(&rbd_dev->snaps);
2182         init_rwsem(&rbd_dev->header_rwsem);
2183
2184         rbd_dev->spec = spec;
2185         rbd_dev->rbd_client = rbdc;
2186
2187         return rbd_dev;
2188 }
2189
2190 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2191 {
2192         kfree(rbd_dev->header_name);
2193         rbd_put_client(rbd_dev->rbd_client);
2194         rbd_spec_put(rbd_dev->spec);
2195         kfree(rbd_dev);
2196 }
2197
2198 static bool rbd_snap_registered(struct rbd_snap *snap)
2199 {
2200         bool ret = snap->dev.type == &rbd_snap_device_type;
2201         bool reg = device_is_registered(&snap->dev);
2202
2203         rbd_assert(!ret ^ reg);
2204
2205         return ret;
2206 }
2207
2208 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2209 {
2210         list_del(&snap->node);
2211         if (device_is_registered(&snap->dev))
2212                 device_unregister(&snap->dev);
2213 }
2214
2215 static int rbd_register_snap_dev(struct rbd_snap *snap,
2216                                   struct device *parent)
2217 {
2218         struct device *dev = &snap->dev;
2219         int ret;
2220
2221         dev->type = &rbd_snap_device_type;
2222         dev->parent = parent;
2223         dev->release = rbd_snap_dev_release;
2224         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2225         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2226
2227         ret = device_register(dev);
2228
2229         return ret;
2230 }
2231
2232 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2233                                                 const char *snap_name,
2234                                                 u64 snap_id, u64 snap_size,
2235                                                 u64 snap_features)
2236 {
2237         struct rbd_snap *snap;
2238         int ret;
2239
2240         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2241         if (!snap)
2242                 return ERR_PTR(-ENOMEM);
2243
2244         ret = -ENOMEM;
2245         snap->name = kstrdup(snap_name, GFP_KERNEL);
2246         if (!snap->name)
2247                 goto err;
2248
2249         snap->id = snap_id;
2250         snap->size = snap_size;
2251         snap->features = snap_features;
2252
2253         return snap;
2254
2255 err:
2256         kfree(snap->name);
2257         kfree(snap);
2258
2259         return ERR_PTR(ret);
2260 }
2261
2262 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2263                 u64 *snap_size, u64 *snap_features)
2264 {
2265         char *snap_name;
2266
2267         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2268
2269         *snap_size = rbd_dev->header.snap_sizes[which];
2270         *snap_features = 0;     /* No features for v1 */
2271
2272         /* Skip over names until we find the one we are looking for */
2273
2274         snap_name = rbd_dev->header.snap_names;
2275         while (which--)
2276                 snap_name += strlen(snap_name) + 1;
2277
2278         return snap_name;
2279 }
2280
2281 /*
2282  * Get the size and object order for an image snapshot, or if
2283  * snap_id is CEPH_NOSNAP, gets this information for the base
2284  * image.
2285  */
2286 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2287                                 u8 *order, u64 *snap_size)
2288 {
2289         __le64 snapid = cpu_to_le64(snap_id);
2290         int ret;
2291         struct {
2292                 u8 order;
2293                 __le64 size;
2294         } __attribute__ ((packed)) size_buf = { 0 };
2295
2296         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2297                                 "rbd", "get_size",
2298                                 (char *) &snapid, sizeof (snapid),
2299                                 (char *) &size_buf, sizeof (size_buf),
2300                                 CEPH_OSD_FLAG_READ, NULL);
2301         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2302         if (ret < 0)
2303                 return ret;
2304
2305         *order = size_buf.order;
2306         *snap_size = le64_to_cpu(size_buf.size);
2307
2308         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2309                 (unsigned long long) snap_id, (unsigned int) *order,
2310                 (unsigned long long) *snap_size);
2311
2312         return 0;
2313 }
2314
2315 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2316 {
2317         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2318                                         &rbd_dev->header.obj_order,
2319                                         &rbd_dev->header.image_size);
2320 }
2321
2322 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2323 {
2324         void *reply_buf;
2325         int ret;
2326         void *p;
2327
2328         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2329         if (!reply_buf)
2330                 return -ENOMEM;
2331
2332         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2333                                 "rbd", "get_object_prefix",
2334                                 NULL, 0,
2335                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2336                                 CEPH_OSD_FLAG_READ, NULL);
2337         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2338         if (ret < 0)
2339                 goto out;
2340         ret = 0;    /* rbd_req_sync_exec() can return positive */
2341
2342         p = reply_buf;
2343         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2344                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2345                                                 NULL, GFP_NOIO);
2346
2347         if (IS_ERR(rbd_dev->header.object_prefix)) {
2348                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2349                 rbd_dev->header.object_prefix = NULL;
2350         } else {
2351                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2352         }
2353
2354 out:
2355         kfree(reply_buf);
2356
2357         return ret;
2358 }
2359
2360 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2361                 u64 *snap_features)
2362 {
2363         __le64 snapid = cpu_to_le64(snap_id);
2364         struct {
2365                 __le64 features;
2366                 __le64 incompat;
2367         } features_buf = { 0 };
2368         u64 incompat;
2369         int ret;
2370
2371         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2372                                 "rbd", "get_features",
2373                                 (char *) &snapid, sizeof (snapid),
2374                                 (char *) &features_buf, sizeof (features_buf),
2375                                 CEPH_OSD_FLAG_READ, NULL);
2376         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2377         if (ret < 0)
2378                 return ret;
2379
2380         incompat = le64_to_cpu(features_buf.incompat);
2381         if (incompat & ~RBD_FEATURES_ALL)
2382                 return -ENOTSUPP;
2383
2384         *snap_features = le64_to_cpu(features_buf.features);
2385
2386         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2387                 (unsigned long long) snap_id,
2388                 (unsigned long long) *snap_features,
2389                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2390
2391         return 0;
2392 }
2393
2394 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2395 {
2396         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2397                                                 &rbd_dev->header.features);
2398 }
2399
2400 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2401 {
2402         size_t size;
2403         int ret;
2404         void *reply_buf;
2405         void *p;
2406         void *end;
2407         u64 seq;
2408         u32 snap_count;
2409         struct ceph_snap_context *snapc;
2410         u32 i;
2411
2412         /*
2413          * We'll need room for the seq value (maximum snapshot id),
2414          * snapshot count, and array of that many snapshot ids.
2415          * For now we have a fixed upper limit on the number we're
2416          * prepared to receive.
2417          */
2418         size = sizeof (__le64) + sizeof (__le32) +
2419                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2420         reply_buf = kzalloc(size, GFP_KERNEL);
2421         if (!reply_buf)
2422                 return -ENOMEM;
2423
2424         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2425                                 "rbd", "get_snapcontext",
2426                                 NULL, 0,
2427                                 reply_buf, size,
2428                                 CEPH_OSD_FLAG_READ, ver);
2429         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2430         if (ret < 0)
2431                 goto out;
2432
2433         ret = -ERANGE;
2434         p = reply_buf;
2435         end = (char *) reply_buf + size;
2436         ceph_decode_64_safe(&p, end, seq, out);
2437         ceph_decode_32_safe(&p, end, snap_count, out);
2438
2439         /*
2440          * Make sure the reported number of snapshot ids wouldn't go
2441          * beyond the end of our buffer.  But before checking that,
2442          * make sure the computed size of the snapshot context we
2443          * allocate is representable in a size_t.
2444          */
2445         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2446                                  / sizeof (u64)) {
2447                 ret = -EINVAL;
2448                 goto out;
2449         }
2450         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2451                 goto out;
2452
2453         size = sizeof (struct ceph_snap_context) +
2454                                 snap_count * sizeof (snapc->snaps[0]);
2455         snapc = kmalloc(size, GFP_KERNEL);
2456         if (!snapc) {
2457                 ret = -ENOMEM;
2458                 goto out;
2459         }
2460
2461         atomic_set(&snapc->nref, 1);
2462         snapc->seq = seq;
2463         snapc->num_snaps = snap_count;
2464         for (i = 0; i < snap_count; i++)
2465                 snapc->snaps[i] = ceph_decode_64(&p);
2466
2467         rbd_dev->header.snapc = snapc;
2468
2469         dout("  snap context seq = %llu, snap_count = %u\n",
2470                 (unsigned long long) seq, (unsigned int) snap_count);
2471
2472 out:
2473         kfree(reply_buf);
2474
2475         return 0;
2476 }
2477
2478 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2479 {
2480         size_t size;
2481         void *reply_buf;
2482         __le64 snap_id;
2483         int ret;
2484         void *p;
2485         void *end;
2486         char *snap_name;
2487
2488         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2489         reply_buf = kmalloc(size, GFP_KERNEL);
2490         if (!reply_buf)
2491                 return ERR_PTR(-ENOMEM);
2492
2493         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2494         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2495                                 "rbd", "get_snapshot_name",
2496                                 (char *) &snap_id, sizeof (snap_id),
2497                                 reply_buf, size,
2498                                 CEPH_OSD_FLAG_READ, NULL);
2499         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2500         if (ret < 0)
2501                 goto out;
2502
2503         p = reply_buf;
2504         end = (char *) reply_buf + size;
2505         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2506         if (IS_ERR(snap_name)) {
2507                 ret = PTR_ERR(snap_name);
2508                 goto out;
2509         } else {
2510                 dout("  snap_id 0x%016llx snap_name = %s\n",
2511                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2512         }
2513         kfree(reply_buf);
2514
2515         return snap_name;
2516 out:
2517         kfree(reply_buf);
2518
2519         return ERR_PTR(ret);
2520 }
2521
2522 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2523                 u64 *snap_size, u64 *snap_features)
2524 {
2525         __le64 snap_id;
2526         u8 order;
2527         int ret;
2528
2529         snap_id = rbd_dev->header.snapc->snaps[which];
2530         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2531         if (ret)
2532                 return ERR_PTR(ret);
2533         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2534         if (ret)
2535                 return ERR_PTR(ret);
2536
2537         return rbd_dev_v2_snap_name(rbd_dev, which);
2538 }
2539
2540 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2541                 u64 *snap_size, u64 *snap_features)
2542 {
2543         if (rbd_dev->image_format == 1)
2544                 return rbd_dev_v1_snap_info(rbd_dev, which,
2545                                         snap_size, snap_features);
2546         if (rbd_dev->image_format == 2)
2547                 return rbd_dev_v2_snap_info(rbd_dev, which,
2548                                         snap_size, snap_features);
2549         return ERR_PTR(-EINVAL);
2550 }
2551
2552 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2553 {
2554         int ret;
2555         __u8 obj_order;
2556
2557         down_write(&rbd_dev->header_rwsem);
2558
2559         /* Grab old order first, to see if it changes */
2560
2561         obj_order = rbd_dev->header.obj_order,
2562         ret = rbd_dev_v2_image_size(rbd_dev);
2563         if (ret)
2564                 goto out;
2565         if (rbd_dev->header.obj_order != obj_order) {
2566                 ret = -EIO;
2567                 goto out;
2568         }
2569         rbd_update_mapping_size(rbd_dev);
2570
2571         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2572         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2573         if (ret)
2574                 goto out;
2575         ret = rbd_dev_snaps_update(rbd_dev);
2576         dout("rbd_dev_snaps_update returned %d\n", ret);
2577         if (ret)
2578                 goto out;
2579         ret = rbd_dev_snaps_register(rbd_dev);
2580         dout("rbd_dev_snaps_register returned %d\n", ret);
2581 out:
2582         up_write(&rbd_dev->header_rwsem);
2583
2584         return ret;
2585 }
2586
2587 /*
2588  * Scan the rbd device's current snapshot list and compare it to the
2589  * newly-received snapshot context.  Remove any existing snapshots
2590  * not present in the new snapshot context.  Add a new snapshot for
2591  * any snaphots in the snapshot context not in the current list.
2592  * And verify there are no changes to snapshots we already know
2593  * about.
2594  *
2595  * Assumes the snapshots in the snapshot context are sorted by
2596  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2597  * are also maintained in that order.)
2598  */
2599 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2600 {
2601         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2602         const u32 snap_count = snapc->num_snaps;
2603         struct list_head *head = &rbd_dev->snaps;
2604         struct list_head *links = head->next;
2605         u32 index = 0;
2606
2607         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2608         while (index < snap_count || links != head) {
2609                 u64 snap_id;
2610                 struct rbd_snap *snap;
2611                 char *snap_name;
2612                 u64 snap_size = 0;
2613                 u64 snap_features = 0;
2614
2615                 snap_id = index < snap_count ? snapc->snaps[index]
2616                                              : CEPH_NOSNAP;
2617                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2618                                      : NULL;
2619                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2620
2621                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2622                         struct list_head *next = links->next;
2623
2624                         /* Existing snapshot not in the new snap context */
2625
2626                         if (rbd_dev->spec->snap_id == snap->id)
2627                                 rbd_dev->exists = false;
2628                         rbd_remove_snap_dev(snap);
2629                         dout("%ssnap id %llu has been removed\n",
2630                                 rbd_dev->spec->snap_id == snap->id ?
2631                                                         "mapped " : "",
2632                                 (unsigned long long) snap->id);
2633
2634                         /* Done with this list entry; advance */
2635
2636                         links = next;
2637                         continue;
2638                 }
2639
2640                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2641                                         &snap_size, &snap_features);
2642                 if (IS_ERR(snap_name))
2643                         return PTR_ERR(snap_name);
2644
2645                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2646                         (unsigned long long) snap_id);
2647                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2648                         struct rbd_snap *new_snap;
2649
2650                         /* We haven't seen this snapshot before */
2651
2652                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2653                                         snap_id, snap_size, snap_features);
2654                         if (IS_ERR(new_snap)) {
2655                                 int err = PTR_ERR(new_snap);
2656
2657                                 dout("  failed to add dev, error %d\n", err);
2658
2659                                 return err;
2660                         }
2661
2662                         /* New goes before existing, or at end of list */
2663
2664                         dout("  added dev%s\n", snap ? "" : " at end\n");
2665                         if (snap)
2666                                 list_add_tail(&new_snap->node, &snap->node);
2667                         else
2668                                 list_add_tail(&new_snap->node, head);
2669                 } else {
2670                         /* Already have this one */
2671
2672                         dout("  already present\n");
2673
2674                         rbd_assert(snap->size == snap_size);
2675                         rbd_assert(!strcmp(snap->name, snap_name));
2676                         rbd_assert(snap->features == snap_features);
2677
2678                         /* Done with this list entry; advance */
2679
2680                         links = links->next;
2681                 }
2682
2683                 /* Advance to the next entry in the snapshot context */
2684
2685                 index++;
2686         }
2687         dout("%s: done\n", __func__);
2688
2689         return 0;
2690 }
2691
2692 /*
2693  * Scan the list of snapshots and register the devices for any that
2694  * have not already been registered.
2695  */
2696 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2697 {
2698         struct rbd_snap *snap;
2699         int ret = 0;
2700
2701         dout("%s called\n", __func__);
2702         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2703                 return -EIO;
2704
2705         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2706                 if (!rbd_snap_registered(snap)) {
2707                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2708                         if (ret < 0)
2709                                 break;
2710                 }
2711         }
2712         dout("%s: returning %d\n", __func__, ret);
2713
2714         return ret;
2715 }
2716
2717 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2718 {
2719         struct device *dev;
2720         int ret;
2721
2722         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2723
2724         dev = &rbd_dev->dev;
2725         dev->bus = &rbd_bus_type;
2726         dev->type = &rbd_device_type;
2727         dev->parent = &rbd_root_dev;
2728         dev->release = rbd_dev_release;
2729         dev_set_name(dev, "%d", rbd_dev->dev_id);
2730         ret = device_register(dev);
2731
2732         mutex_unlock(&ctl_mutex);
2733
2734         return ret;
2735 }
2736
2737 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2738 {
2739         device_unregister(&rbd_dev->dev);
2740 }
2741
2742 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2743 {
2744         int ret, rc;
2745
2746         do {
2747                 ret = rbd_req_sync_watch(rbd_dev);
2748                 if (ret == -ERANGE) {
2749                         rc = rbd_dev_refresh(rbd_dev, NULL);
2750                         if (rc < 0)
2751                                 return rc;
2752                 }
2753         } while (ret == -ERANGE);
2754
2755         return ret;
2756 }
2757
2758 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2759
2760 /*
2761  * Get a unique rbd identifier for the given new rbd_dev, and add
2762  * the rbd_dev to the global list.  The minimum rbd id is 1.
2763  */
2764 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2765 {
2766         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2767
2768         spin_lock(&rbd_dev_list_lock);
2769         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2770         spin_unlock(&rbd_dev_list_lock);
2771         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2772                 (unsigned long long) rbd_dev->dev_id);
2773 }
2774
2775 /*
2776  * Remove an rbd_dev from the global list, and record that its
2777  * identifier is no longer in use.
2778  */
2779 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2780 {
2781         struct list_head *tmp;
2782         int rbd_id = rbd_dev->dev_id;
2783         int max_id;
2784
2785         rbd_assert(rbd_id > 0);
2786
2787         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2788                 (unsigned long long) rbd_dev->dev_id);
2789         spin_lock(&rbd_dev_list_lock);
2790         list_del_init(&rbd_dev->node);
2791
2792         /*
2793          * If the id being "put" is not the current maximum, there
2794          * is nothing special we need to do.
2795          */
2796         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2797                 spin_unlock(&rbd_dev_list_lock);
2798                 return;
2799         }
2800
2801         /*
2802          * We need to update the current maximum id.  Search the
2803          * list to find out what it is.  We're more likely to find
2804          * the maximum at the end, so search the list backward.
2805          */
2806         max_id = 0;
2807         list_for_each_prev(tmp, &rbd_dev_list) {
2808                 struct rbd_device *rbd_dev;
2809
2810                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2811                 if (rbd_dev->dev_id > max_id)
2812                         max_id = rbd_dev->dev_id;
2813         }
2814         spin_unlock(&rbd_dev_list_lock);
2815
2816         /*
2817          * The max id could have been updated by rbd_dev_id_get(), in
2818          * which case it now accurately reflects the new maximum.
2819          * Be careful not to overwrite the maximum value in that
2820          * case.
2821          */
2822         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2823         dout("  max dev id has been reset\n");
2824 }
2825
2826 /*
2827  * Skips over white space at *buf, and updates *buf to point to the
2828  * first found non-space character (if any). Returns the length of
2829  * the token (string of non-white space characters) found.  Note
2830  * that *buf must be terminated with '\0'.
2831  */
2832 static inline size_t next_token(const char **buf)
2833 {
2834         /*
2835         * These are the characters that produce nonzero for
2836         * isspace() in the "C" and "POSIX" locales.
2837         */
2838         const char *spaces = " \f\n\r\t\v";
2839
2840         *buf += strspn(*buf, spaces);   /* Find start of token */
2841
2842         return strcspn(*buf, spaces);   /* Return token length */
2843 }
2844
2845 /*
2846  * Finds the next token in *buf, and if the provided token buffer is
2847  * big enough, copies the found token into it.  The result, if
2848  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2849  * must be terminated with '\0' on entry.
2850  *
2851  * Returns the length of the token found (not including the '\0').
2852  * Return value will be 0 if no token is found, and it will be >=
2853  * token_size if the token would not fit.
2854  *
2855  * The *buf pointer will be updated to point beyond the end of the
2856  * found token.  Note that this occurs even if the token buffer is
2857  * too small to hold it.
2858  */
2859 static inline size_t copy_token(const char **buf,
2860                                 char *token,
2861                                 size_t token_size)
2862 {
2863         size_t len;
2864
2865         len = next_token(buf);
2866         if (len < token_size) {
2867                 memcpy(token, *buf, len);
2868                 *(token + len) = '\0';
2869         }
2870         *buf += len;
2871
2872         return len;
2873 }
2874
2875 /*
2876  * Finds the next token in *buf, dynamically allocates a buffer big
2877  * enough to hold a copy of it, and copies the token into the new
2878  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2879  * that a duplicate buffer is created even for a zero-length token.
2880  *
2881  * Returns a pointer to the newly-allocated duplicate, or a null
2882  * pointer if memory for the duplicate was not available.  If
2883  * the lenp argument is a non-null pointer, the length of the token
2884  * (not including the '\0') is returned in *lenp.
2885  *
2886  * If successful, the *buf pointer will be updated to point beyond
2887  * the end of the found token.
2888  *
2889  * Note: uses GFP_KERNEL for allocation.
2890  */
2891 static inline char *dup_token(const char **buf, size_t *lenp)
2892 {
2893         char *dup;
2894         size_t len;
2895
2896         len = next_token(buf);
2897         dup = kmalloc(len + 1, GFP_KERNEL);
2898         if (!dup)
2899                 return NULL;
2900
2901         memcpy(dup, *buf, len);
2902         *(dup + len) = '\0';
2903         *buf += len;
2904
2905         if (lenp)
2906                 *lenp = len;
2907
2908         return dup;
2909 }
2910
2911 /*
2912  * Parse the options provided for an "rbd add" (i.e., rbd image
2913  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
2914  * and the data written is passed here via a NUL-terminated buffer.
2915  * Returns 0 if successful or an error code otherwise.
2916  *
2917  * The information extracted from these options is recorded in
2918  * the other parameters which return dynamically-allocated
2919  * structures:
2920  *  ceph_opts
2921  *      The address of a pointer that will refer to a ceph options
2922  *      structure.  Caller must release the returned pointer using
2923  *      ceph_destroy_options() when it is no longer needed.
2924  *  rbd_opts
2925  *      Address of an rbd options pointer.  Fully initialized by
2926  *      this function; caller must release with kfree().
2927  *  spec
2928  *      Address of an rbd image specification pointer.  Fully
2929  *      initialized by this function based on parsed options.
2930  *      Caller must release with rbd_spec_put().
2931  *
2932  * The options passed take this form:
2933  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
2934  * where:
2935  *  <mon_addrs>
2936  *      A comma-separated list of one or more monitor addresses.
2937  *      A monitor address is an ip address, optionally followed
2938  *      by a port number (separated by a colon).
2939  *        I.e.:  ip1[:port1][,ip2[:port2]...]
2940  *  <options>
2941  *      A comma-separated list of ceph and/or rbd options.
2942  *  <pool_name>
2943  *      The name of the rados pool containing the rbd image.
2944  *  <image_name>
2945  *      The name of the image in that pool to map.
2946  *  <snap_id>
2947  *      An optional snapshot id.  If provided, the mapping will
2948  *      present data from the image at the time that snapshot was
2949  *      created.  The image head is used if no snapshot id is
2950  *      provided.  Snapshot mappings are always read-only.
2951  */
2952 static int rbd_add_parse_args(const char *buf,
2953                                 struct ceph_options **ceph_opts,
2954                                 struct rbd_options **opts,
2955                                 struct rbd_spec **rbd_spec)
2956 {
2957         size_t len;
2958         char *options;
2959         const char *mon_addrs;
2960         size_t mon_addrs_size;
2961         struct rbd_spec *spec = NULL;
2962         struct rbd_options *rbd_opts = NULL;
2963         struct ceph_options *copts;
2964         int ret;
2965
2966         /* The first four tokens are required */
2967
2968         len = next_token(&buf);
2969         if (!len)
2970                 return -EINVAL; /* Missing monitor address(es) */
2971         mon_addrs = buf;
2972         mon_addrs_size = len + 1;
2973         buf += len;
2974
2975         ret = -EINVAL;
2976         options = dup_token(&buf, NULL);
2977         if (!options)
2978                 return -ENOMEM;
2979         if (!*options)
2980                 goto out_err;   /* Missing options */
2981
2982         spec = rbd_spec_alloc();
2983         if (!spec)
2984                 goto out_mem;
2985
2986         spec->pool_name = dup_token(&buf, NULL);
2987         if (!spec->pool_name)
2988                 goto out_mem;
2989         if (!*spec->pool_name)
2990                 goto out_err;   /* Missing pool name */
2991
2992         spec->image_name = dup_token(&buf, &spec->image_name_len);
2993         if (!spec->image_name)
2994                 goto out_mem;
2995         if (!*spec->image_name)
2996                 goto out_err;   /* Missing image name */
2997
2998         /*
2999          * Snapshot name is optional; default is to use "-"
3000          * (indicating the head/no snapshot).
3001          */
3002         len = next_token(&buf);
3003         if (!len) {
3004                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3005                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3006         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3007                 ret = -ENAMETOOLONG;
3008                 goto out_err;
3009         }
3010         spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
3011         if (!spec->snap_name)
3012                 goto out_mem;
3013         memcpy(spec->snap_name, buf, len);
3014         *(spec->snap_name + len) = '\0';
3015
3016         /* Initialize all rbd options to the defaults */
3017
3018         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3019         if (!rbd_opts)
3020                 goto out_mem;
3021
3022         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3023
3024         copts = ceph_parse_options(options, mon_addrs,
3025                                         mon_addrs + mon_addrs_size - 1,
3026                                         parse_rbd_opts_token, rbd_opts);
3027         if (IS_ERR(copts)) {
3028                 ret = PTR_ERR(copts);
3029                 goto out_err;
3030         }
3031         kfree(options);
3032
3033         *ceph_opts = copts;
3034         *opts = rbd_opts;
3035         *rbd_spec = spec;
3036
3037         return 0;
3038 out_mem:
3039         ret = -ENOMEM;
3040 out_err:
3041         kfree(rbd_opts);
3042         rbd_spec_put(spec);
3043         kfree(options);
3044
3045         return ret;
3046 }
3047
3048 /*
3049  * An rbd format 2 image has a unique identifier, distinct from the
3050  * name given to it by the user.  Internally, that identifier is
3051  * what's used to specify the names of objects related to the image.
3052  *
3053  * A special "rbd id" object is used to map an rbd image name to its
3054  * id.  If that object doesn't exist, then there is no v2 rbd image
3055  * with the supplied name.
3056  *
3057  * This function will record the given rbd_dev's image_id field if
3058  * it can be determined, and in that case will return 0.  If any
3059  * errors occur a negative errno will be returned and the rbd_dev's
3060  * image_id field will be unchanged (and should be NULL).
3061  */
3062 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3063 {
3064         int ret;
3065         size_t size;
3066         char *object_name;
3067         void *response;
3068         void *p;
3069
3070         /*
3071          * When probing a parent image, the image id is already
3072          * known (and the image name likely is not).  There's no
3073          * need to fetch the image id again in this case.
3074          */
3075         if (rbd_dev->spec->image_id)
3076                 return 0;
3077
3078         /*
3079          * First, see if the format 2 image id file exists, and if
3080          * so, get the image's persistent id from it.
3081          */
3082         size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
3083         object_name = kmalloc(size, GFP_NOIO);
3084         if (!object_name)
3085                 return -ENOMEM;
3086         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3087         dout("rbd id object name is %s\n", object_name);
3088
3089         /* Response will be an encoded string, which includes a length */
3090
3091         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3092         response = kzalloc(size, GFP_NOIO);
3093         if (!response) {
3094                 ret = -ENOMEM;
3095                 goto out;
3096         }
3097
3098         ret = rbd_req_sync_exec(rbd_dev, object_name,
3099                                 "rbd", "get_id",
3100                                 NULL, 0,
3101                                 response, RBD_IMAGE_ID_LEN_MAX,
3102                                 CEPH_OSD_FLAG_READ, NULL);
3103         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3104         if (ret < 0)
3105                 goto out;
3106         ret = 0;    /* rbd_req_sync_exec() can return positive */
3107
3108         p = response;
3109         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3110                                                 p + RBD_IMAGE_ID_LEN_MAX,
3111                                                 &rbd_dev->spec->image_id_len,
3112                                                 GFP_NOIO);
3113         if (IS_ERR(rbd_dev->spec->image_id)) {
3114                 ret = PTR_ERR(rbd_dev->spec->image_id);
3115                 rbd_dev->spec->image_id = NULL;
3116         } else {
3117                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3118         }
3119 out:
3120         kfree(response);
3121         kfree(object_name);
3122
3123         return ret;
3124 }
3125
3126 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3127 {
3128         int ret;
3129         size_t size;
3130
3131         /* Version 1 images have no id; empty string is used */
3132
3133         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3134         if (!rbd_dev->spec->image_id)
3135                 return -ENOMEM;
3136         rbd_dev->spec->image_id_len = 0;
3137
3138         /* Record the header object name for this rbd image. */
3139
3140         size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
3141         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3142         if (!rbd_dev->header_name) {
3143                 ret = -ENOMEM;
3144                 goto out_err;
3145         }
3146         sprintf(rbd_dev->header_name, "%s%s",
3147                 rbd_dev->spec->image_name, RBD_SUFFIX);
3148
3149         /* Populate rbd image metadata */
3150
3151         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3152         if (ret < 0)
3153                 goto out_err;
3154         rbd_dev->image_format = 1;
3155
3156         dout("discovered version 1 image, header name is %s\n",
3157                 rbd_dev->header_name);
3158
3159         return 0;
3160
3161 out_err:
3162         kfree(rbd_dev->header_name);
3163         rbd_dev->header_name = NULL;
3164         kfree(rbd_dev->spec->image_id);
3165         rbd_dev->spec->image_id = NULL;
3166
3167         return ret;
3168 }
3169
3170 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3171 {
3172         size_t size;
3173         int ret;
3174         u64 ver = 0;
3175
3176         /*
3177          * Image id was filled in by the caller.  Record the header
3178          * object name for this rbd image.
3179          */
3180         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
3181         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3182         if (!rbd_dev->header_name)
3183                 return -ENOMEM;
3184         sprintf(rbd_dev->header_name, "%s%s",
3185                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3186
3187         /* Get the size and object order for the image */
3188
3189         ret = rbd_dev_v2_image_size(rbd_dev);
3190         if (ret < 0)
3191                 goto out_err;
3192
3193         /* Get the object prefix (a.k.a. block_name) for the image */
3194
3195         ret = rbd_dev_v2_object_prefix(rbd_dev);
3196         if (ret < 0)
3197                 goto out_err;
3198
3199         /* Get the and check features for the image */
3200
3201         ret = rbd_dev_v2_features(rbd_dev);
3202         if (ret < 0)
3203                 goto out_err;
3204
3205         /* crypto and compression type aren't (yet) supported for v2 images */
3206
3207         rbd_dev->header.crypt_type = 0;
3208         rbd_dev->header.comp_type = 0;
3209
3210         /* Get the snapshot context, plus the header version */
3211
3212         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3213         if (ret)
3214                 goto out_err;
3215         rbd_dev->header.obj_version = ver;
3216
3217         rbd_dev->image_format = 2;
3218
3219         dout("discovered version 2 image, header name is %s\n",
3220                 rbd_dev->header_name);
3221
3222         return 0;
3223 out_err:
3224         kfree(rbd_dev->header_name);
3225         rbd_dev->header_name = NULL;
3226         kfree(rbd_dev->header.object_prefix);
3227         rbd_dev->header.object_prefix = NULL;
3228
3229         return ret;
3230 }
3231
3232 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3233 {
3234         int ret;
3235
3236         /* no need to lock here, as rbd_dev is not registered yet */
3237         ret = rbd_dev_snaps_update(rbd_dev);
3238         if (ret)
3239                 return ret;
3240
3241         ret = rbd_dev_set_mapping(rbd_dev);
3242         if (ret)
3243                 goto err_out_snaps;
3244
3245         /* generate unique id: find highest unique id, add one */
3246         rbd_dev_id_get(rbd_dev);
3247
3248         /* Fill in the device name, now that we have its id. */
3249         BUILD_BUG_ON(DEV_NAME_LEN
3250                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3251         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3252
3253         /* Get our block major device number. */
3254
3255         ret = register_blkdev(0, rbd_dev->name);
3256         if (ret < 0)
3257                 goto err_out_id;
3258         rbd_dev->major = ret;
3259
3260         /* Set up the blkdev mapping. */
3261
3262         ret = rbd_init_disk(rbd_dev);
3263         if (ret)
3264                 goto err_out_blkdev;
3265
3266         ret = rbd_bus_add_dev(rbd_dev);
3267         if (ret)
3268                 goto err_out_disk;
3269
3270         /*
3271          * At this point cleanup in the event of an error is the job
3272          * of the sysfs code (initiated by rbd_bus_del_dev()).
3273          */
3274         down_write(&rbd_dev->header_rwsem);
3275         ret = rbd_dev_snaps_register(rbd_dev);
3276         up_write(&rbd_dev->header_rwsem);
3277         if (ret)
3278                 goto err_out_bus;
3279
3280         ret = rbd_init_watch_dev(rbd_dev);
3281         if (ret)
3282                 goto err_out_bus;
3283
3284         /* Everything's ready.  Announce the disk to the world. */
3285
3286         add_disk(rbd_dev->disk);
3287
3288         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3289                 (unsigned long long) rbd_dev->mapping.size);
3290
3291         return ret;
3292 err_out_bus:
3293         /* this will also clean up rest of rbd_dev stuff */
3294
3295         rbd_bus_del_dev(rbd_dev);
3296
3297         return ret;
3298 err_out_disk:
3299         rbd_free_disk(rbd_dev);
3300 err_out_blkdev:
3301         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3302 err_out_id:
3303         rbd_dev_id_put(rbd_dev);
3304 err_out_snaps:
3305         rbd_remove_all_snaps(rbd_dev);
3306
3307         return ret;
3308 }
3309
3310 /*
3311  * Probe for the existence of the header object for the given rbd
3312  * device.  For format 2 images this includes determining the image
3313  * id.
3314  */
3315 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3316 {
3317         int ret;
3318
3319         /*
3320          * Get the id from the image id object.  If it's not a
3321          * format 2 image, we'll get ENOENT back, and we'll assume
3322          * it's a format 1 image.
3323          */
3324         ret = rbd_dev_image_id(rbd_dev);
3325         if (ret)
3326                 ret = rbd_dev_v1_probe(rbd_dev);
3327         else
3328                 ret = rbd_dev_v2_probe(rbd_dev);
3329         if (ret) {
3330                 dout("probe failed, returning %d\n", ret);
3331
3332                 return ret;
3333         }
3334
3335         ret = rbd_dev_probe_finish(rbd_dev);
3336         if (ret)
3337                 rbd_header_free(&rbd_dev->header);
3338
3339         return ret;
3340 }
3341
3342 static ssize_t rbd_add(struct bus_type *bus,
3343                        const char *buf,
3344                        size_t count)
3345 {
3346         struct rbd_device *rbd_dev = NULL;
3347         struct ceph_options *ceph_opts = NULL;
3348         struct rbd_options *rbd_opts = NULL;
3349         struct rbd_spec *spec = NULL;
3350         struct rbd_client *rbdc;
3351         struct ceph_osd_client *osdc;
3352         int rc = -ENOMEM;
3353
3354         if (!try_module_get(THIS_MODULE))
3355                 return -ENODEV;
3356
3357         /* parse add command */
3358         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3359         if (rc < 0)
3360                 goto err_out_module;
3361
3362         rbdc = rbd_get_client(ceph_opts);
3363         if (IS_ERR(rbdc)) {
3364                 rc = PTR_ERR(rbdc);
3365                 goto err_out_args;
3366         }
3367         ceph_opts = NULL;       /* rbd_dev client now owns this */
3368
3369         /* pick the pool */
3370         osdc = &rbdc->client->osdc;
3371         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3372         if (rc < 0)
3373                 goto err_out_client;
3374         spec->pool_id = (u64) rc;
3375
3376         rbd_dev = rbd_dev_create(rbdc, spec);
3377         if (!rbd_dev)
3378                 goto err_out_client;
3379         rbdc = NULL;            /* rbd_dev now owns this */
3380         spec = NULL;            /* rbd_dev now owns this */
3381
3382         rbd_dev->mapping.read_only = rbd_opts->read_only;
3383         kfree(rbd_opts);
3384         rbd_opts = NULL;        /* done with this */
3385
3386         rc = rbd_dev_probe(rbd_dev);
3387         if (rc < 0)
3388                 goto err_out_rbd_dev;
3389
3390         return count;
3391 err_out_rbd_dev:
3392         rbd_dev_destroy(rbd_dev);
3393 err_out_client:
3394         rbd_put_client(rbdc);
3395 err_out_args:
3396         if (ceph_opts)
3397                 ceph_destroy_options(ceph_opts);
3398         kfree(rbd_opts);
3399         rbd_spec_put(spec);
3400 err_out_module:
3401         module_put(THIS_MODULE);
3402
3403         dout("Error adding device %s\n", buf);
3404
3405         return (ssize_t) rc;
3406 }
3407
3408 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3409 {
3410         struct list_head *tmp;
3411         struct rbd_device *rbd_dev;
3412
3413         spin_lock(&rbd_dev_list_lock);
3414         list_for_each(tmp, &rbd_dev_list) {
3415                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3416                 if (rbd_dev->dev_id == dev_id) {
3417                         spin_unlock(&rbd_dev_list_lock);
3418                         return rbd_dev;
3419                 }
3420         }
3421         spin_unlock(&rbd_dev_list_lock);
3422         return NULL;
3423 }
3424
3425 static void rbd_dev_release(struct device *dev)
3426 {
3427         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3428
3429         if (rbd_dev->watch_request) {
3430                 struct ceph_client *client = rbd_dev->rbd_client->client;
3431
3432                 ceph_osdc_unregister_linger_request(&client->osdc,
3433                                                     rbd_dev->watch_request);
3434         }
3435         if (rbd_dev->watch_event)
3436                 rbd_req_sync_unwatch(rbd_dev);
3437
3438
3439         /* clean up and free blkdev */
3440         rbd_free_disk(rbd_dev);
3441         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3442
3443         /* release allocated disk header fields */
3444         rbd_header_free(&rbd_dev->header);
3445
3446         /* done with the id, and with the rbd_dev */
3447         rbd_dev_id_put(rbd_dev);
3448         rbd_assert(rbd_dev->rbd_client != NULL);
3449         rbd_dev_destroy(rbd_dev);
3450
3451         /* release module ref */
3452         module_put(THIS_MODULE);
3453 }
3454
3455 static ssize_t rbd_remove(struct bus_type *bus,
3456                           const char *buf,
3457                           size_t count)
3458 {
3459         struct rbd_device *rbd_dev = NULL;
3460         int target_id, rc;
3461         unsigned long ul;
3462         int ret = count;
3463
3464         rc = strict_strtoul(buf, 10, &ul);
3465         if (rc)
3466                 return rc;
3467
3468         /* convert to int; abort if we lost anything in the conversion */
3469         target_id = (int) ul;
3470         if (target_id != ul)
3471                 return -EINVAL;
3472
3473         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3474
3475         rbd_dev = __rbd_get_dev(target_id);
3476         if (!rbd_dev) {
3477                 ret = -ENOENT;
3478                 goto done;
3479         }
3480
3481         rbd_remove_all_snaps(rbd_dev);
3482         rbd_bus_del_dev(rbd_dev);
3483
3484 done:
3485         mutex_unlock(&ctl_mutex);
3486
3487         return ret;
3488 }
3489
3490 /*
3491  * create control files in sysfs
3492  * /sys/bus/rbd/...
3493  */
3494 static int rbd_sysfs_init(void)
3495 {
3496         int ret;
3497
3498         ret = device_register(&rbd_root_dev);
3499         if (ret < 0)
3500                 return ret;
3501
3502         ret = bus_register(&rbd_bus_type);
3503         if (ret < 0)
3504                 device_unregister(&rbd_root_dev);
3505
3506         return ret;
3507 }
3508
3509 static void rbd_sysfs_cleanup(void)
3510 {
3511         bus_unregister(&rbd_bus_type);
3512         device_unregister(&rbd_root_dev);
3513 }
3514
3515 int __init rbd_init(void)
3516 {
3517         int rc;
3518
3519         rc = rbd_sysfs_init();
3520         if (rc)
3521                 return rc;
3522         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3523         return 0;
3524 }
3525
3526 void __exit rbd_exit(void)
3527 {
3528         rbd_sysfs_cleanup();
3529 }
3530
3531 module_init(rbd_init);
3532 module_exit(rbd_exit);
3533
3534 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3535 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3536 MODULE_DESCRIPTION("rados block device");
3537
3538 /* following authorship retained from original osdblk.c */
3539 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3540
3541 MODULE_LICENSE("GPL");