]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
4734446c3b5b12d0f9862b6989956997cbcd7680
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN         1024
67
68 #define RBD_SNAP_HEAD_NAME      "-"
69
70 #define RBD_IMAGE_ID_LEN_MAX    64
71 #define RBD_OBJ_PREFIX_LEN_MAX  64
72
73 /* Feature bits */
74
75 #define RBD_FEATURE_LAYERING      1
76
77 /* Features supported by this (client software) implementation. */
78
79 #define RBD_FEATURES_ALL          (0)
80
81 /*
82  * An RBD device name will be "rbd#", where the "rbd" comes from
83  * RBD_DRV_NAME above, and # is a unique integer identifier.
84  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
85  * enough to hold all possible device names.
86  */
87 #define DEV_NAME_LEN            32
88 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
89
90 #define RBD_READ_ONLY_DEFAULT           false
91
92 /*
93  * block device image metadata (in-memory version)
94  */
95 struct rbd_image_header {
96         /* These four fields never change for a given rbd image */
97         char *object_prefix;
98         u64 features;
99         __u8 obj_order;
100         __u8 crypt_type;
101         __u8 comp_type;
102
103         /* The remaining fields need to be updated occasionally */
104         u64 image_size;
105         struct ceph_snap_context *snapc;
106         char *snap_names;
107         u64 *snap_sizes;
108
109         u64 obj_version;
110 };
111
112 struct rbd_options {
113         bool    read_only;
114 };
115
116 /*
117  * an instance of the client.  multiple devices may share an rbd client.
118  */
119 struct rbd_client {
120         struct ceph_client      *client;
121         struct kref             kref;
122         struct list_head        node;
123 };
124
125 /*
126  * a request completion status
127  */
128 struct rbd_req_status {
129         int done;
130         int rc;
131         u64 bytes;
132 };
133
134 /*
135  * a collection of requests
136  */
137 struct rbd_req_coll {
138         int                     total;
139         int                     num_done;
140         struct kref             kref;
141         struct rbd_req_status   status[0];
142 };
143
144 /*
145  * a single io request
146  */
147 struct rbd_request {
148         struct request          *rq;            /* blk layer request */
149         struct bio              *bio;           /* cloned bio */
150         struct page             **pages;        /* list of used pages */
151         u64                     len;
152         int                     coll_index;
153         struct rbd_req_coll     *coll;
154 };
155
156 struct rbd_snap {
157         struct  device          dev;
158         const char              *name;
159         u64                     size;
160         struct list_head        node;
161         u64                     id;
162         u64                     features;
163 };
164
165 struct rbd_mapping {
166         char                    *snap_name;
167         u64                     snap_id;
168         u64                     size;
169         u64                     features;
170         bool                    snap_exists;
171         bool                    read_only;
172 };
173
174 /*
175  * a single device
176  */
177 struct rbd_device {
178         int                     dev_id;         /* blkdev unique id */
179
180         int                     major;          /* blkdev assigned major */
181         struct gendisk          *disk;          /* blkdev's gendisk and rq */
182
183         u32                     image_format;   /* Either 1 or 2 */
184         struct rbd_options      rbd_opts;
185         struct rbd_client       *rbd_client;
186
187         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
188
189         spinlock_t              lock;           /* queue lock */
190
191         struct rbd_image_header header;
192         char                    *image_id;
193         size_t                  image_id_len;
194         char                    *image_name;
195         size_t                  image_name_len;
196         char                    *header_name;
197         char                    *pool_name;
198         int                     pool_id;
199
200         struct ceph_osd_event   *watch_event;
201         struct ceph_osd_request *watch_request;
202
203         /* protects updating the header */
204         struct rw_semaphore     header_rwsem;
205
206         struct rbd_mapping      mapping;
207
208         struct list_head        node;
209
210         /* list of snapshots */
211         struct list_head        snaps;
212
213         /* sysfs related */
214         struct device           dev;
215 };
216
217 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
218
219 static LIST_HEAD(rbd_dev_list);    /* devices */
220 static DEFINE_SPINLOCK(rbd_dev_list_lock);
221
222 static LIST_HEAD(rbd_client_list);              /* clients */
223 static DEFINE_SPINLOCK(rbd_client_list_lock);
224
225 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
226 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
227
228 static void rbd_dev_release(struct device *dev);
229 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
230
231 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
232                        size_t count);
233 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
234                           size_t count);
235
236 static struct bus_attribute rbd_bus_attrs[] = {
237         __ATTR(add, S_IWUSR, NULL, rbd_add),
238         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
239         __ATTR_NULL
240 };
241
242 static struct bus_type rbd_bus_type = {
243         .name           = "rbd",
244         .bus_attrs      = rbd_bus_attrs,
245 };
246
247 static void rbd_root_dev_release(struct device *dev)
248 {
249 }
250
251 static struct device rbd_root_dev = {
252         .init_name =    "rbd",
253         .release =      rbd_root_dev_release,
254 };
255
256 #ifdef RBD_DEBUG
257 #define rbd_assert(expr)                                                \
258                 if (unlikely(!(expr))) {                                \
259                         printk(KERN_ERR "\nAssertion failure in %s() "  \
260                                                 "at line %d:\n\n"       \
261                                         "\trbd_assert(%s);\n\n",        \
262                                         __func__, __LINE__, #expr);     \
263                         BUG();                                          \
264                 }
265 #else /* !RBD_DEBUG */
266 #  define rbd_assert(expr)      ((void) 0)
267 #endif /* !RBD_DEBUG */
268
269 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
270 {
271         return get_device(&rbd_dev->dev);
272 }
273
274 static void rbd_put_dev(struct rbd_device *rbd_dev)
275 {
276         put_device(&rbd_dev->dev);
277 }
278
279 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
280 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
281
282 static int rbd_open(struct block_device *bdev, fmode_t mode)
283 {
284         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
285
286         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
287                 return -EROFS;
288
289         rbd_get_dev(rbd_dev);
290         set_device_ro(bdev, rbd_dev->mapping.read_only);
291
292         return 0;
293 }
294
295 static int rbd_release(struct gendisk *disk, fmode_t mode)
296 {
297         struct rbd_device *rbd_dev = disk->private_data;
298
299         rbd_put_dev(rbd_dev);
300
301         return 0;
302 }
303
304 static const struct block_device_operations rbd_bd_ops = {
305         .owner                  = THIS_MODULE,
306         .open                   = rbd_open,
307         .release                = rbd_release,
308 };
309
310 /*
311  * Initialize an rbd client instance.
312  * We own *ceph_opts.
313  */
314 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
315 {
316         struct rbd_client *rbdc;
317         int ret = -ENOMEM;
318
319         dout("rbd_client_create\n");
320         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
321         if (!rbdc)
322                 goto out_opt;
323
324         kref_init(&rbdc->kref);
325         INIT_LIST_HEAD(&rbdc->node);
326
327         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
328
329         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
330         if (IS_ERR(rbdc->client))
331                 goto out_mutex;
332         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
333
334         ret = ceph_open_session(rbdc->client);
335         if (ret < 0)
336                 goto out_err;
337
338         spin_lock(&rbd_client_list_lock);
339         list_add_tail(&rbdc->node, &rbd_client_list);
340         spin_unlock(&rbd_client_list_lock);
341
342         mutex_unlock(&ctl_mutex);
343
344         dout("rbd_client_create created %p\n", rbdc);
345         return rbdc;
346
347 out_err:
348         ceph_destroy_client(rbdc->client);
349 out_mutex:
350         mutex_unlock(&ctl_mutex);
351         kfree(rbdc);
352 out_opt:
353         if (ceph_opts)
354                 ceph_destroy_options(ceph_opts);
355         return ERR_PTR(ret);
356 }
357
358 /*
359  * Find a ceph client with specific addr and configuration.  If
360  * found, bump its reference count.
361  */
362 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
363 {
364         struct rbd_client *client_node;
365         bool found = false;
366
367         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
368                 return NULL;
369
370         spin_lock(&rbd_client_list_lock);
371         list_for_each_entry(client_node, &rbd_client_list, node) {
372                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
373                         kref_get(&client_node->kref);
374                         found = true;
375                         break;
376                 }
377         }
378         spin_unlock(&rbd_client_list_lock);
379
380         return found ? client_node : NULL;
381 }
382
383 /*
384  * mount options
385  */
386 enum {
387         Opt_last_int,
388         /* int args above */
389         Opt_last_string,
390         /* string args above */
391         Opt_read_only,
392         Opt_read_write,
393         /* Boolean args above */
394         Opt_last_bool,
395 };
396
397 static match_table_t rbd_opts_tokens = {
398         /* int args above */
399         /* string args above */
400         {Opt_read_only, "read_only"},
401         {Opt_read_only, "ro"},          /* Alternate spelling */
402         {Opt_read_write, "read_write"},
403         {Opt_read_write, "rw"},         /* Alternate spelling */
404         /* Boolean args above */
405         {-1, NULL}
406 };
407
408 static int parse_rbd_opts_token(char *c, void *private)
409 {
410         struct rbd_options *rbd_opts = private;
411         substring_t argstr[MAX_OPT_ARGS];
412         int token, intval, ret;
413
414         token = match_token(c, rbd_opts_tokens, argstr);
415         if (token < 0)
416                 return -EINVAL;
417
418         if (token < Opt_last_int) {
419                 ret = match_int(&argstr[0], &intval);
420                 if (ret < 0) {
421                         pr_err("bad mount option arg (not int) "
422                                "at '%s'\n", c);
423                         return ret;
424                 }
425                 dout("got int token %d val %d\n", token, intval);
426         } else if (token > Opt_last_int && token < Opt_last_string) {
427                 dout("got string token %d val %s\n", token,
428                      argstr[0].from);
429         } else if (token > Opt_last_string && token < Opt_last_bool) {
430                 dout("got Boolean token %d\n", token);
431         } else {
432                 dout("got token %d\n", token);
433         }
434
435         switch (token) {
436         case Opt_read_only:
437                 rbd_opts->read_only = true;
438                 break;
439         case Opt_read_write:
440                 rbd_opts->read_only = false;
441                 break;
442         default:
443                 rbd_assert(false);
444                 break;
445         }
446         return 0;
447 }
448
449 /*
450  * Get a ceph client with specific addr and configuration, if one does
451  * not exist create it.
452  */
453 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
454                                 size_t mon_addr_len, char *options)
455 {
456         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
457         struct ceph_options *ceph_opts;
458         struct rbd_client *rbdc;
459
460         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
461
462         ceph_opts = ceph_parse_options(options, mon_addr,
463                                         mon_addr + mon_addr_len,
464                                         parse_rbd_opts_token, rbd_opts);
465         if (IS_ERR(ceph_opts))
466                 return PTR_ERR(ceph_opts);
467
468         rbdc = rbd_client_find(ceph_opts);
469         if (rbdc) {
470                 /* using an existing client */
471                 ceph_destroy_options(ceph_opts);
472         } else {
473                 rbdc = rbd_client_create(ceph_opts);
474                 if (IS_ERR(rbdc))
475                         return PTR_ERR(rbdc);
476         }
477         rbd_dev->rbd_client = rbdc;
478
479         return 0;
480 }
481
482 /*
483  * Destroy ceph client
484  *
485  * Caller must hold rbd_client_list_lock.
486  */
487 static void rbd_client_release(struct kref *kref)
488 {
489         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
490
491         dout("rbd_release_client %p\n", rbdc);
492         spin_lock(&rbd_client_list_lock);
493         list_del(&rbdc->node);
494         spin_unlock(&rbd_client_list_lock);
495
496         ceph_destroy_client(rbdc->client);
497         kfree(rbdc);
498 }
499
500 /*
501  * Drop reference to ceph client node. If it's not referenced anymore, release
502  * it.
503  */
504 static void rbd_put_client(struct rbd_device *rbd_dev)
505 {
506         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
507         rbd_dev->rbd_client = NULL;
508 }
509
510 /*
511  * Destroy requests collection
512  */
513 static void rbd_coll_release(struct kref *kref)
514 {
515         struct rbd_req_coll *coll =
516                 container_of(kref, struct rbd_req_coll, kref);
517
518         dout("rbd_coll_release %p\n", coll);
519         kfree(coll);
520 }
521
522 static bool rbd_image_format_valid(u32 image_format)
523 {
524         return image_format == 1 || image_format == 2;
525 }
526
527 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
528 {
529         size_t size;
530         u32 snap_count;
531
532         /* The header has to start with the magic rbd header text */
533         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
534                 return false;
535
536         /* The bio layer requires at least sector-sized I/O */
537
538         if (ondisk->options.order < SECTOR_SHIFT)
539                 return false;
540
541         /* If we use u64 in a few spots we may be able to loosen this */
542
543         if (ondisk->options.order > 8 * sizeof (int) - 1)
544                 return false;
545
546         /*
547          * The size of a snapshot header has to fit in a size_t, and
548          * that limits the number of snapshots.
549          */
550         snap_count = le32_to_cpu(ondisk->snap_count);
551         size = SIZE_MAX - sizeof (struct ceph_snap_context);
552         if (snap_count > size / sizeof (__le64))
553                 return false;
554
555         /*
556          * Not only that, but the size of the entire the snapshot
557          * header must also be representable in a size_t.
558          */
559         size -= snap_count * sizeof (__le64);
560         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
561                 return false;
562
563         return true;
564 }
565
566 /*
567  * Create a new header structure, translate header format from the on-disk
568  * header.
569  */
570 static int rbd_header_from_disk(struct rbd_image_header *header,
571                                  struct rbd_image_header_ondisk *ondisk)
572 {
573         u32 snap_count;
574         size_t len;
575         size_t size;
576         u32 i;
577
578         memset(header, 0, sizeof (*header));
579
580         snap_count = le32_to_cpu(ondisk->snap_count);
581
582         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
583         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
584         if (!header->object_prefix)
585                 return -ENOMEM;
586         memcpy(header->object_prefix, ondisk->object_prefix, len);
587         header->object_prefix[len] = '\0';
588
589         if (snap_count) {
590                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
591
592                 /* Save a copy of the snapshot names */
593
594                 if (snap_names_len > (u64) SIZE_MAX)
595                         return -EIO;
596                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
597                 if (!header->snap_names)
598                         goto out_err;
599                 /*
600                  * Note that rbd_dev_v1_header_read() guarantees
601                  * the ondisk buffer we're working with has
602                  * snap_names_len bytes beyond the end of the
603                  * snapshot id array, this memcpy() is safe.
604                  */
605                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
606                         snap_names_len);
607
608                 /* Record each snapshot's size */
609
610                 size = snap_count * sizeof (*header->snap_sizes);
611                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
612                 if (!header->snap_sizes)
613                         goto out_err;
614                 for (i = 0; i < snap_count; i++)
615                         header->snap_sizes[i] =
616                                 le64_to_cpu(ondisk->snaps[i].image_size);
617         } else {
618                 WARN_ON(ondisk->snap_names_len);
619                 header->snap_names = NULL;
620                 header->snap_sizes = NULL;
621         }
622
623         header->features = 0;   /* No features support in v1 images */
624         header->obj_order = ondisk->options.order;
625         header->crypt_type = ondisk->options.crypt_type;
626         header->comp_type = ondisk->options.comp_type;
627
628         /* Allocate and fill in the snapshot context */
629
630         header->image_size = le64_to_cpu(ondisk->image_size);
631         size = sizeof (struct ceph_snap_context);
632         size += snap_count * sizeof (header->snapc->snaps[0]);
633         header->snapc = kzalloc(size, GFP_KERNEL);
634         if (!header->snapc)
635                 goto out_err;
636
637         atomic_set(&header->snapc->nref, 1);
638         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
639         header->snapc->num_snaps = snap_count;
640         for (i = 0; i < snap_count; i++)
641                 header->snapc->snaps[i] =
642                         le64_to_cpu(ondisk->snaps[i].id);
643
644         return 0;
645
646 out_err:
647         kfree(header->snap_sizes);
648         header->snap_sizes = NULL;
649         kfree(header->snap_names);
650         header->snap_names = NULL;
651         kfree(header->object_prefix);
652         header->object_prefix = NULL;
653
654         return -ENOMEM;
655 }
656
657 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
658 {
659
660         struct rbd_snap *snap;
661
662         list_for_each_entry(snap, &rbd_dev->snaps, node) {
663                 if (!strcmp(snap_name, snap->name)) {
664                         rbd_dev->mapping.snap_id = snap->id;
665                         rbd_dev->mapping.size = snap->size;
666                         rbd_dev->mapping.features = snap->features;
667
668                         return 0;
669                 }
670         }
671
672         return -ENOENT;
673 }
674
675 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
676 {
677         int ret;
678
679         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
680                     sizeof (RBD_SNAP_HEAD_NAME))) {
681                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
682                 rbd_dev->mapping.size = rbd_dev->header.image_size;
683                 rbd_dev->mapping.features = rbd_dev->header.features;
684                 rbd_dev->mapping.snap_exists = false;
685                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
686                 ret = 0;
687         } else {
688                 ret = snap_by_name(rbd_dev, snap_name);
689                 if (ret < 0)
690                         goto done;
691                 rbd_dev->mapping.snap_exists = true;
692                 rbd_dev->mapping.read_only = true;
693         }
694         rbd_dev->mapping.snap_name = snap_name;
695 done:
696         return ret;
697 }
698
699 static void rbd_header_free(struct rbd_image_header *header)
700 {
701         kfree(header->object_prefix);
702         header->object_prefix = NULL;
703         kfree(header->snap_sizes);
704         header->snap_sizes = NULL;
705         kfree(header->snap_names);
706         header->snap_names = NULL;
707         ceph_put_snap_context(header->snapc);
708         header->snapc = NULL;
709 }
710
711 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
712 {
713         char *name;
714         u64 segment;
715         int ret;
716
717         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
718         if (!name)
719                 return NULL;
720         segment = offset >> rbd_dev->header.obj_order;
721         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
722                         rbd_dev->header.object_prefix, segment);
723         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
724                 pr_err("error formatting segment name for #%llu (%d)\n",
725                         segment, ret);
726                 kfree(name);
727                 name = NULL;
728         }
729
730         return name;
731 }
732
733 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
734 {
735         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
736
737         return offset & (segment_size - 1);
738 }
739
740 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
741                                 u64 offset, u64 length)
742 {
743         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
744
745         offset &= segment_size - 1;
746
747         rbd_assert(length <= U64_MAX - offset);
748         if (offset + length > segment_size)
749                 length = segment_size - offset;
750
751         return length;
752 }
753
754 static int rbd_get_num_segments(struct rbd_image_header *header,
755                                 u64 ofs, u64 len)
756 {
757         u64 start_seg;
758         u64 end_seg;
759
760         if (!len)
761                 return 0;
762         if (len - 1 > U64_MAX - ofs)
763                 return -ERANGE;
764
765         start_seg = ofs >> header->obj_order;
766         end_seg = (ofs + len - 1) >> header->obj_order;
767
768         return end_seg - start_seg + 1;
769 }
770
771 /*
772  * returns the size of an object in the image
773  */
774 static u64 rbd_obj_bytes(struct rbd_image_header *header)
775 {
776         return 1 << header->obj_order;
777 }
778
779 /*
780  * bio helpers
781  */
782
783 static void bio_chain_put(struct bio *chain)
784 {
785         struct bio *tmp;
786
787         while (chain) {
788                 tmp = chain;
789                 chain = chain->bi_next;
790                 bio_put(tmp);
791         }
792 }
793
794 /*
795  * zeros a bio chain, starting at specific offset
796  */
797 static void zero_bio_chain(struct bio *chain, int start_ofs)
798 {
799         struct bio_vec *bv;
800         unsigned long flags;
801         void *buf;
802         int i;
803         int pos = 0;
804
805         while (chain) {
806                 bio_for_each_segment(bv, chain, i) {
807                         if (pos + bv->bv_len > start_ofs) {
808                                 int remainder = max(start_ofs - pos, 0);
809                                 buf = bvec_kmap_irq(bv, &flags);
810                                 memset(buf + remainder, 0,
811                                        bv->bv_len - remainder);
812                                 bvec_kunmap_irq(buf, &flags);
813                         }
814                         pos += bv->bv_len;
815                 }
816
817                 chain = chain->bi_next;
818         }
819 }
820
821 /*
822  * bio_chain_clone - clone a chain of bios up to a certain length.
823  * might return a bio_pair that will need to be released.
824  */
825 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
826                                    struct bio_pair **bp,
827                                    int len, gfp_t gfpmask)
828 {
829         struct bio *old_chain = *old;
830         struct bio *new_chain = NULL;
831         struct bio *tail;
832         int total = 0;
833
834         if (*bp) {
835                 bio_pair_release(*bp);
836                 *bp = NULL;
837         }
838
839         while (old_chain && (total < len)) {
840                 struct bio *tmp;
841
842                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
843                 if (!tmp)
844                         goto err_out;
845                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
846
847                 if (total + old_chain->bi_size > len) {
848                         struct bio_pair *bp;
849
850                         /*
851                          * this split can only happen with a single paged bio,
852                          * split_bio will BUG_ON if this is not the case
853                          */
854                         dout("bio_chain_clone split! total=%d remaining=%d"
855                              "bi_size=%u\n",
856                              total, len - total, old_chain->bi_size);
857
858                         /* split the bio. We'll release it either in the next
859                            call, or it will have to be released outside */
860                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
861                         if (!bp)
862                                 goto err_out;
863
864                         __bio_clone(tmp, &bp->bio1);
865
866                         *next = &bp->bio2;
867                 } else {
868                         __bio_clone(tmp, old_chain);
869                         *next = old_chain->bi_next;
870                 }
871
872                 tmp->bi_bdev = NULL;
873                 tmp->bi_next = NULL;
874                 if (new_chain)
875                         tail->bi_next = tmp;
876                 else
877                         new_chain = tmp;
878                 tail = tmp;
879                 old_chain = old_chain->bi_next;
880
881                 total += tmp->bi_size;
882         }
883
884         rbd_assert(total == len);
885
886         *old = old_chain;
887
888         return new_chain;
889
890 err_out:
891         dout("bio_chain_clone with err\n");
892         bio_chain_put(new_chain);
893         return NULL;
894 }
895
896 /*
897  * helpers for osd request op vectors.
898  */
899 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
900                                         int opcode, u32 payload_len)
901 {
902         struct ceph_osd_req_op *ops;
903
904         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
905         if (!ops)
906                 return NULL;
907
908         ops[0].op = opcode;
909
910         /*
911          * op extent offset and length will be set later on
912          * in calc_raw_layout()
913          */
914         ops[0].payload_len = payload_len;
915
916         return ops;
917 }
918
919 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
920 {
921         kfree(ops);
922 }
923
924 static void rbd_coll_end_req_index(struct request *rq,
925                                    struct rbd_req_coll *coll,
926                                    int index,
927                                    int ret, u64 len)
928 {
929         struct request_queue *q;
930         int min, max, i;
931
932         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
933              coll, index, ret, (unsigned long long) len);
934
935         if (!rq)
936                 return;
937
938         if (!coll) {
939                 blk_end_request(rq, ret, len);
940                 return;
941         }
942
943         q = rq->q;
944
945         spin_lock_irq(q->queue_lock);
946         coll->status[index].done = 1;
947         coll->status[index].rc = ret;
948         coll->status[index].bytes = len;
949         max = min = coll->num_done;
950         while (max < coll->total && coll->status[max].done)
951                 max++;
952
953         for (i = min; i<max; i++) {
954                 __blk_end_request(rq, coll->status[i].rc,
955                                   coll->status[i].bytes);
956                 coll->num_done++;
957                 kref_put(&coll->kref, rbd_coll_release);
958         }
959         spin_unlock_irq(q->queue_lock);
960 }
961
962 static void rbd_coll_end_req(struct rbd_request *req,
963                              int ret, u64 len)
964 {
965         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
966 }
967
968 /*
969  * Send ceph osd request
970  */
971 static int rbd_do_request(struct request *rq,
972                           struct rbd_device *rbd_dev,
973                           struct ceph_snap_context *snapc,
974                           u64 snapid,
975                           const char *object_name, u64 ofs, u64 len,
976                           struct bio *bio,
977                           struct page **pages,
978                           int num_pages,
979                           int flags,
980                           struct ceph_osd_req_op *ops,
981                           struct rbd_req_coll *coll,
982                           int coll_index,
983                           void (*rbd_cb)(struct ceph_osd_request *req,
984                                          struct ceph_msg *msg),
985                           struct ceph_osd_request **linger_req,
986                           u64 *ver)
987 {
988         struct ceph_osd_request *req;
989         struct ceph_file_layout *layout;
990         int ret;
991         u64 bno;
992         struct timespec mtime = CURRENT_TIME;
993         struct rbd_request *req_data;
994         struct ceph_osd_request_head *reqhead;
995         struct ceph_osd_client *osdc;
996
997         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
998         if (!req_data) {
999                 if (coll)
1000                         rbd_coll_end_req_index(rq, coll, coll_index,
1001                                                -ENOMEM, len);
1002                 return -ENOMEM;
1003         }
1004
1005         if (coll) {
1006                 req_data->coll = coll;
1007                 req_data->coll_index = coll_index;
1008         }
1009
1010         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
1011                 (unsigned long long) ofs, (unsigned long long) len);
1012
1013         osdc = &rbd_dev->rbd_client->client->osdc;
1014         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1015                                         false, GFP_NOIO, pages, bio);
1016         if (!req) {
1017                 ret = -ENOMEM;
1018                 goto done_pages;
1019         }
1020
1021         req->r_callback = rbd_cb;
1022
1023         req_data->rq = rq;
1024         req_data->bio = bio;
1025         req_data->pages = pages;
1026         req_data->len = len;
1027
1028         req->r_priv = req_data;
1029
1030         reqhead = req->r_request->front.iov_base;
1031         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1032
1033         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1034         req->r_oid_len = strlen(req->r_oid);
1035
1036         layout = &req->r_file_layout;
1037         memset(layout, 0, sizeof(*layout));
1038         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1039         layout->fl_stripe_count = cpu_to_le32(1);
1040         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1041         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1042         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1043                                    req, ops);
1044         rbd_assert(ret == 0);
1045
1046         ceph_osdc_build_request(req, ofs, &len,
1047                                 ops,
1048                                 snapc,
1049                                 &mtime,
1050                                 req->r_oid, req->r_oid_len);
1051
1052         if (linger_req) {
1053                 ceph_osdc_set_request_linger(osdc, req);
1054                 *linger_req = req;
1055         }
1056
1057         ret = ceph_osdc_start_request(osdc, req, false);
1058         if (ret < 0)
1059                 goto done_err;
1060
1061         if (!rbd_cb) {
1062                 ret = ceph_osdc_wait_request(osdc, req);
1063                 if (ver)
1064                         *ver = le64_to_cpu(req->r_reassert_version.version);
1065                 dout("reassert_ver=%llu\n",
1066                         (unsigned long long)
1067                                 le64_to_cpu(req->r_reassert_version.version));
1068                 ceph_osdc_put_request(req);
1069         }
1070         return ret;
1071
1072 done_err:
1073         bio_chain_put(req_data->bio);
1074         ceph_osdc_put_request(req);
1075 done_pages:
1076         rbd_coll_end_req(req_data, ret, len);
1077         kfree(req_data);
1078         return ret;
1079 }
1080
1081 /*
1082  * Ceph osd op callback
1083  */
1084 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1085 {
1086         struct rbd_request *req_data = req->r_priv;
1087         struct ceph_osd_reply_head *replyhead;
1088         struct ceph_osd_op *op;
1089         __s32 rc;
1090         u64 bytes;
1091         int read_op;
1092
1093         /* parse reply */
1094         replyhead = msg->front.iov_base;
1095         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1096         op = (void *)(replyhead + 1);
1097         rc = le32_to_cpu(replyhead->result);
1098         bytes = le64_to_cpu(op->extent.length);
1099         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1100
1101         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1102                 (unsigned long long) bytes, read_op, (int) rc);
1103
1104         if (rc == -ENOENT && read_op) {
1105                 zero_bio_chain(req_data->bio, 0);
1106                 rc = 0;
1107         } else if (rc == 0 && read_op && bytes < req_data->len) {
1108                 zero_bio_chain(req_data->bio, bytes);
1109                 bytes = req_data->len;
1110         }
1111
1112         rbd_coll_end_req(req_data, rc, bytes);
1113
1114         if (req_data->bio)
1115                 bio_chain_put(req_data->bio);
1116
1117         ceph_osdc_put_request(req);
1118         kfree(req_data);
1119 }
1120
1121 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1122 {
1123         ceph_osdc_put_request(req);
1124 }
1125
1126 /*
1127  * Do a synchronous ceph osd operation
1128  */
1129 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1130                            struct ceph_snap_context *snapc,
1131                            u64 snapid,
1132                            int flags,
1133                            struct ceph_osd_req_op *ops,
1134                            const char *object_name,
1135                            u64 ofs, u64 inbound_size,
1136                            char *inbound,
1137                            struct ceph_osd_request **linger_req,
1138                            u64 *ver)
1139 {
1140         int ret;
1141         struct page **pages;
1142         int num_pages;
1143
1144         rbd_assert(ops != NULL);
1145
1146         num_pages = calc_pages_for(ofs, inbound_size);
1147         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1148         if (IS_ERR(pages))
1149                 return PTR_ERR(pages);
1150
1151         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1152                           object_name, ofs, inbound_size, NULL,
1153                           pages, num_pages,
1154                           flags,
1155                           ops,
1156                           NULL, 0,
1157                           NULL,
1158                           linger_req, ver);
1159         if (ret < 0)
1160                 goto done;
1161
1162         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1163                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1164
1165 done:
1166         ceph_release_page_vector(pages, num_pages);
1167         return ret;
1168 }
1169
1170 /*
1171  * Do an asynchronous ceph osd operation
1172  */
1173 static int rbd_do_op(struct request *rq,
1174                      struct rbd_device *rbd_dev,
1175                      struct ceph_snap_context *snapc,
1176                      u64 ofs, u64 len,
1177                      struct bio *bio,
1178                      struct rbd_req_coll *coll,
1179                      int coll_index)
1180 {
1181         char *seg_name;
1182         u64 seg_ofs;
1183         u64 seg_len;
1184         int ret;
1185         struct ceph_osd_req_op *ops;
1186         u32 payload_len;
1187         int opcode;
1188         int flags;
1189         u64 snapid;
1190
1191         seg_name = rbd_segment_name(rbd_dev, ofs);
1192         if (!seg_name)
1193                 return -ENOMEM;
1194         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1195         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1196
1197         if (rq_data_dir(rq) == WRITE) {
1198                 opcode = CEPH_OSD_OP_WRITE;
1199                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1200                 snapid = CEPH_NOSNAP;
1201                 payload_len = seg_len;
1202         } else {
1203                 opcode = CEPH_OSD_OP_READ;
1204                 flags = CEPH_OSD_FLAG_READ;
1205                 snapc = NULL;
1206                 snapid = rbd_dev->mapping.snap_id;
1207                 payload_len = 0;
1208         }
1209
1210         ret = -ENOMEM;
1211         ops = rbd_create_rw_ops(1, opcode, payload_len);
1212         if (!ops)
1213                 goto done;
1214
1215         /* we've taken care of segment sizes earlier when we
1216            cloned the bios. We should never have a segment
1217            truncated at this point */
1218         rbd_assert(seg_len == len);
1219
1220         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1221                              seg_name, seg_ofs, seg_len,
1222                              bio,
1223                              NULL, 0,
1224                              flags,
1225                              ops,
1226                              coll, coll_index,
1227                              rbd_req_cb, 0, NULL);
1228
1229         rbd_destroy_ops(ops);
1230 done:
1231         kfree(seg_name);
1232         return ret;
1233 }
1234
1235 /*
1236  * Request sync osd read
1237  */
1238 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1239                           u64 snapid,
1240                           const char *object_name,
1241                           u64 ofs, u64 len,
1242                           char *buf,
1243                           u64 *ver)
1244 {
1245         struct ceph_osd_req_op *ops;
1246         int ret;
1247
1248         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1249         if (!ops)
1250                 return -ENOMEM;
1251
1252         ret = rbd_req_sync_op(rbd_dev, NULL,
1253                                snapid,
1254                                CEPH_OSD_FLAG_READ,
1255                                ops, object_name, ofs, len, buf, NULL, ver);
1256         rbd_destroy_ops(ops);
1257
1258         return ret;
1259 }
1260
1261 /*
1262  * Request sync osd watch
1263  */
1264 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1265                                    u64 ver,
1266                                    u64 notify_id)
1267 {
1268         struct ceph_osd_req_op *ops;
1269         int ret;
1270
1271         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1272         if (!ops)
1273                 return -ENOMEM;
1274
1275         ops[0].watch.ver = cpu_to_le64(ver);
1276         ops[0].watch.cookie = notify_id;
1277         ops[0].watch.flag = 0;
1278
1279         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1280                           rbd_dev->header_name, 0, 0, NULL,
1281                           NULL, 0,
1282                           CEPH_OSD_FLAG_READ,
1283                           ops,
1284                           NULL, 0,
1285                           rbd_simple_req_cb, 0, NULL);
1286
1287         rbd_destroy_ops(ops);
1288         return ret;
1289 }
1290
1291 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1292 {
1293         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1294         u64 hver;
1295         int rc;
1296
1297         if (!rbd_dev)
1298                 return;
1299
1300         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1301                 rbd_dev->header_name, (unsigned long long) notify_id,
1302                 (unsigned int) opcode);
1303         rc = rbd_dev_refresh(rbd_dev, &hver);
1304         if (rc)
1305                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1306                            " update snaps: %d\n", rbd_dev->major, rc);
1307
1308         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1309 }
1310
1311 /*
1312  * Request sync osd watch
1313  */
1314 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1315 {
1316         struct ceph_osd_req_op *ops;
1317         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1318         int ret;
1319
1320         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1321         if (!ops)
1322                 return -ENOMEM;
1323
1324         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1325                                      (void *)rbd_dev, &rbd_dev->watch_event);
1326         if (ret < 0)
1327                 goto fail;
1328
1329         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1330         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1331         ops[0].watch.flag = 1;
1332
1333         ret = rbd_req_sync_op(rbd_dev, NULL,
1334                               CEPH_NOSNAP,
1335                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1336                               ops,
1337                               rbd_dev->header_name,
1338                               0, 0, NULL,
1339                               &rbd_dev->watch_request, NULL);
1340
1341         if (ret < 0)
1342                 goto fail_event;
1343
1344         rbd_destroy_ops(ops);
1345         return 0;
1346
1347 fail_event:
1348         ceph_osdc_cancel_event(rbd_dev->watch_event);
1349         rbd_dev->watch_event = NULL;
1350 fail:
1351         rbd_destroy_ops(ops);
1352         return ret;
1353 }
1354
1355 /*
1356  * Request sync osd unwatch
1357  */
1358 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1359 {
1360         struct ceph_osd_req_op *ops;
1361         int ret;
1362
1363         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1364         if (!ops)
1365                 return -ENOMEM;
1366
1367         ops[0].watch.ver = 0;
1368         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1369         ops[0].watch.flag = 0;
1370
1371         ret = rbd_req_sync_op(rbd_dev, NULL,
1372                               CEPH_NOSNAP,
1373                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1374                               ops,
1375                               rbd_dev->header_name,
1376                               0, 0, NULL, NULL, NULL);
1377
1378
1379         rbd_destroy_ops(ops);
1380         ceph_osdc_cancel_event(rbd_dev->watch_event);
1381         rbd_dev->watch_event = NULL;
1382         return ret;
1383 }
1384
1385 /*
1386  * Synchronous osd object method call
1387  */
1388 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1389                              const char *object_name,
1390                              const char *class_name,
1391                              const char *method_name,
1392                              const char *outbound,
1393                              size_t outbound_size,
1394                              char *inbound,
1395                              size_t inbound_size,
1396                              int flags,
1397                              u64 *ver)
1398 {
1399         struct ceph_osd_req_op *ops;
1400         int class_name_len = strlen(class_name);
1401         int method_name_len = strlen(method_name);
1402         int payload_size;
1403         int ret;
1404
1405         /*
1406          * Any input parameters required by the method we're calling
1407          * will be sent along with the class and method names as
1408          * part of the message payload.  That data and its size are
1409          * supplied via the indata and indata_len fields (named from
1410          * the perspective of the server side) in the OSD request
1411          * operation.
1412          */
1413         payload_size = class_name_len + method_name_len + outbound_size;
1414         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1415         if (!ops)
1416                 return -ENOMEM;
1417
1418         ops[0].cls.class_name = class_name;
1419         ops[0].cls.class_len = (__u8) class_name_len;
1420         ops[0].cls.method_name = method_name;
1421         ops[0].cls.method_len = (__u8) method_name_len;
1422         ops[0].cls.argc = 0;
1423         ops[0].cls.indata = outbound;
1424         ops[0].cls.indata_len = outbound_size;
1425
1426         ret = rbd_req_sync_op(rbd_dev, NULL,
1427                                CEPH_NOSNAP,
1428                                flags, ops,
1429                                object_name, 0, inbound_size, inbound,
1430                                NULL, ver);
1431
1432         rbd_destroy_ops(ops);
1433
1434         dout("cls_exec returned %d\n", ret);
1435         return ret;
1436 }
1437
1438 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1439 {
1440         struct rbd_req_coll *coll =
1441                         kzalloc(sizeof(struct rbd_req_coll) +
1442                                 sizeof(struct rbd_req_status) * num_reqs,
1443                                 GFP_ATOMIC);
1444
1445         if (!coll)
1446                 return NULL;
1447         coll->total = num_reqs;
1448         kref_init(&coll->kref);
1449         return coll;
1450 }
1451
1452 /*
1453  * block device queue callback
1454  */
1455 static void rbd_rq_fn(struct request_queue *q)
1456 {
1457         struct rbd_device *rbd_dev = q->queuedata;
1458         struct request *rq;
1459         struct bio_pair *bp = NULL;
1460
1461         while ((rq = blk_fetch_request(q))) {
1462                 struct bio *bio;
1463                 struct bio *rq_bio, *next_bio = NULL;
1464                 bool do_write;
1465                 unsigned int size;
1466                 u64 op_size = 0;
1467                 u64 ofs;
1468                 int num_segs, cur_seg = 0;
1469                 struct rbd_req_coll *coll;
1470                 struct ceph_snap_context *snapc;
1471
1472                 dout("fetched request\n");
1473
1474                 /* filter out block requests we don't understand */
1475                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1476                         __blk_end_request_all(rq, 0);
1477                         continue;
1478                 }
1479
1480                 /* deduce our operation (read, write) */
1481                 do_write = (rq_data_dir(rq) == WRITE);
1482
1483                 size = blk_rq_bytes(rq);
1484                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1485                 rq_bio = rq->bio;
1486                 if (do_write && rbd_dev->mapping.read_only) {
1487                         __blk_end_request_all(rq, -EROFS);
1488                         continue;
1489                 }
1490
1491                 spin_unlock_irq(q->queue_lock);
1492
1493                 down_read(&rbd_dev->header_rwsem);
1494
1495                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1496                                 !rbd_dev->mapping.snap_exists) {
1497                         up_read(&rbd_dev->header_rwsem);
1498                         dout("request for non-existent snapshot");
1499                         spin_lock_irq(q->queue_lock);
1500                         __blk_end_request_all(rq, -ENXIO);
1501                         continue;
1502                 }
1503
1504                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1505
1506                 up_read(&rbd_dev->header_rwsem);
1507
1508                 dout("%s 0x%x bytes at 0x%llx\n",
1509                      do_write ? "write" : "read",
1510                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1511
1512                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1513                 if (num_segs <= 0) {
1514                         spin_lock_irq(q->queue_lock);
1515                         __blk_end_request_all(rq, num_segs);
1516                         ceph_put_snap_context(snapc);
1517                         continue;
1518                 }
1519                 coll = rbd_alloc_coll(num_segs);
1520                 if (!coll) {
1521                         spin_lock_irq(q->queue_lock);
1522                         __blk_end_request_all(rq, -ENOMEM);
1523                         ceph_put_snap_context(snapc);
1524                         continue;
1525                 }
1526
1527                 do {
1528                         /* a bio clone to be passed down to OSD req */
1529                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1530                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1531                         kref_get(&coll->kref);
1532                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1533                                               op_size, GFP_ATOMIC);
1534                         if (bio)
1535                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1536                                                 ofs, op_size,
1537                                                 bio, coll, cur_seg);
1538                         else
1539                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1540                                                        -ENOMEM, op_size);
1541                         size -= op_size;
1542                         ofs += op_size;
1543
1544                         cur_seg++;
1545                         rq_bio = next_bio;
1546                 } while (size > 0);
1547                 kref_put(&coll->kref, rbd_coll_release);
1548
1549                 if (bp)
1550                         bio_pair_release(bp);
1551                 spin_lock_irq(q->queue_lock);
1552
1553                 ceph_put_snap_context(snapc);
1554         }
1555 }
1556
1557 /*
1558  * a queue callback. Makes sure that we don't create a bio that spans across
1559  * multiple osd objects. One exception would be with a single page bios,
1560  * which we handle later at bio_chain_clone
1561  */
1562 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1563                           struct bio_vec *bvec)
1564 {
1565         struct rbd_device *rbd_dev = q->queuedata;
1566         unsigned int chunk_sectors;
1567         sector_t sector;
1568         unsigned int bio_sectors;
1569         int max;
1570
1571         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1572         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1573         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1574
1575         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1576                                  + bio_sectors)) << SECTOR_SHIFT;
1577         if (max < 0)
1578                 max = 0; /* bio_add cannot handle a negative return */
1579         if (max <= bvec->bv_len && bio_sectors == 0)
1580                 return bvec->bv_len;
1581         return max;
1582 }
1583
1584 static void rbd_free_disk(struct rbd_device *rbd_dev)
1585 {
1586         struct gendisk *disk = rbd_dev->disk;
1587
1588         if (!disk)
1589                 return;
1590
1591         if (disk->flags & GENHD_FL_UP)
1592                 del_gendisk(disk);
1593         if (disk->queue)
1594                 blk_cleanup_queue(disk->queue);
1595         put_disk(disk);
1596 }
1597
1598 /*
1599  * Read the complete header for the given rbd device.
1600  *
1601  * Returns a pointer to a dynamically-allocated buffer containing
1602  * the complete and validated header.  Caller can pass the address
1603  * of a variable that will be filled in with the version of the
1604  * header object at the time it was read.
1605  *
1606  * Returns a pointer-coded errno if a failure occurs.
1607  */
1608 static struct rbd_image_header_ondisk *
1609 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1610 {
1611         struct rbd_image_header_ondisk *ondisk = NULL;
1612         u32 snap_count = 0;
1613         u64 names_size = 0;
1614         u32 want_count;
1615         int ret;
1616
1617         /*
1618          * The complete header will include an array of its 64-bit
1619          * snapshot ids, followed by the names of those snapshots as
1620          * a contiguous block of NUL-terminated strings.  Note that
1621          * the number of snapshots could change by the time we read
1622          * it in, in which case we re-read it.
1623          */
1624         do {
1625                 size_t size;
1626
1627                 kfree(ondisk);
1628
1629                 size = sizeof (*ondisk);
1630                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1631                 size += names_size;
1632                 ondisk = kmalloc(size, GFP_KERNEL);
1633                 if (!ondisk)
1634                         return ERR_PTR(-ENOMEM);
1635
1636                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1637                                        rbd_dev->header_name,
1638                                        0, size,
1639                                        (char *) ondisk, version);
1640
1641                 if (ret < 0)
1642                         goto out_err;
1643                 if (WARN_ON((size_t) ret < size)) {
1644                         ret = -ENXIO;
1645                         pr_warning("short header read for image %s"
1646                                         " (want %zd got %d)\n",
1647                                 rbd_dev->image_name, size, ret);
1648                         goto out_err;
1649                 }
1650                 if (!rbd_dev_ondisk_valid(ondisk)) {
1651                         ret = -ENXIO;
1652                         pr_warning("invalid header for image %s\n",
1653                                 rbd_dev->image_name);
1654                         goto out_err;
1655                 }
1656
1657                 names_size = le64_to_cpu(ondisk->snap_names_len);
1658                 want_count = snap_count;
1659                 snap_count = le32_to_cpu(ondisk->snap_count);
1660         } while (snap_count != want_count);
1661
1662         return ondisk;
1663
1664 out_err:
1665         kfree(ondisk);
1666
1667         return ERR_PTR(ret);
1668 }
1669
1670 /*
1671  * reload the ondisk the header
1672  */
1673 static int rbd_read_header(struct rbd_device *rbd_dev,
1674                            struct rbd_image_header *header)
1675 {
1676         struct rbd_image_header_ondisk *ondisk;
1677         u64 ver = 0;
1678         int ret;
1679
1680         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1681         if (IS_ERR(ondisk))
1682                 return PTR_ERR(ondisk);
1683         ret = rbd_header_from_disk(header, ondisk);
1684         if (ret >= 0)
1685                 header->obj_version = ver;
1686         kfree(ondisk);
1687
1688         return ret;
1689 }
1690
1691 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1692 {
1693         struct rbd_snap *snap;
1694         struct rbd_snap *next;
1695
1696         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1697                 __rbd_remove_snap_dev(snap);
1698 }
1699
1700 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1701 {
1702         sector_t size;
1703
1704         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1705                 return;
1706
1707         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1708         dout("setting size to %llu sectors", (unsigned long long) size);
1709         rbd_dev->mapping.size = (u64) size;
1710         set_capacity(rbd_dev->disk, size);
1711 }
1712
1713 /*
1714  * only read the first part of the ondisk header, without the snaps info
1715  */
1716 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1717 {
1718         int ret;
1719         struct rbd_image_header h;
1720
1721         ret = rbd_read_header(rbd_dev, &h);
1722         if (ret < 0)
1723                 return ret;
1724
1725         down_write(&rbd_dev->header_rwsem);
1726
1727         /* Update image size, and check for resize of mapped image */
1728         rbd_dev->header.image_size = h.image_size;
1729         rbd_update_mapping_size(rbd_dev);
1730
1731         /* rbd_dev->header.object_prefix shouldn't change */
1732         kfree(rbd_dev->header.snap_sizes);
1733         kfree(rbd_dev->header.snap_names);
1734         /* osd requests may still refer to snapc */
1735         ceph_put_snap_context(rbd_dev->header.snapc);
1736
1737         if (hver)
1738                 *hver = h.obj_version;
1739         rbd_dev->header.obj_version = h.obj_version;
1740         rbd_dev->header.image_size = h.image_size;
1741         rbd_dev->header.snapc = h.snapc;
1742         rbd_dev->header.snap_names = h.snap_names;
1743         rbd_dev->header.snap_sizes = h.snap_sizes;
1744         /* Free the extra copy of the object prefix */
1745         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1746         kfree(h.object_prefix);
1747
1748         ret = rbd_dev_snaps_update(rbd_dev);
1749         if (!ret)
1750                 ret = rbd_dev_snaps_register(rbd_dev);
1751
1752         up_write(&rbd_dev->header_rwsem);
1753
1754         return ret;
1755 }
1756
1757 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1758 {
1759         int ret;
1760
1761         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1762         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1763         if (rbd_dev->image_format == 1)
1764                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1765         else
1766                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1767         mutex_unlock(&ctl_mutex);
1768
1769         return ret;
1770 }
1771
1772 static int rbd_init_disk(struct rbd_device *rbd_dev)
1773 {
1774         struct gendisk *disk;
1775         struct request_queue *q;
1776         u64 segment_size;
1777
1778         /* create gendisk info */
1779         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1780         if (!disk)
1781                 return -ENOMEM;
1782
1783         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1784                  rbd_dev->dev_id);
1785         disk->major = rbd_dev->major;
1786         disk->first_minor = 0;
1787         disk->fops = &rbd_bd_ops;
1788         disk->private_data = rbd_dev;
1789
1790         /* init rq */
1791         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1792         if (!q)
1793                 goto out_disk;
1794
1795         /* We use the default size, but let's be explicit about it. */
1796         blk_queue_physical_block_size(q, SECTOR_SIZE);
1797
1798         /* set io sizes to object size */
1799         segment_size = rbd_obj_bytes(&rbd_dev->header);
1800         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1801         blk_queue_max_segment_size(q, segment_size);
1802         blk_queue_io_min(q, segment_size);
1803         blk_queue_io_opt(q, segment_size);
1804
1805         blk_queue_merge_bvec(q, rbd_merge_bvec);
1806         disk->queue = q;
1807
1808         q->queuedata = rbd_dev;
1809
1810         rbd_dev->disk = disk;
1811
1812         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1813
1814         return 0;
1815 out_disk:
1816         put_disk(disk);
1817
1818         return -ENOMEM;
1819 }
1820
1821 /*
1822   sysfs
1823 */
1824
1825 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1826 {
1827         return container_of(dev, struct rbd_device, dev);
1828 }
1829
1830 static ssize_t rbd_size_show(struct device *dev,
1831                              struct device_attribute *attr, char *buf)
1832 {
1833         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1834         sector_t size;
1835
1836         down_read(&rbd_dev->header_rwsem);
1837         size = get_capacity(rbd_dev->disk);
1838         up_read(&rbd_dev->header_rwsem);
1839
1840         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1841 }
1842
1843 /*
1844  * Note this shows the features for whatever's mapped, which is not
1845  * necessarily the base image.
1846  */
1847 static ssize_t rbd_features_show(struct device *dev,
1848                              struct device_attribute *attr, char *buf)
1849 {
1850         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1851
1852         return sprintf(buf, "0x%016llx\n",
1853                         (unsigned long long) rbd_dev->mapping.features);
1854 }
1855
1856 static ssize_t rbd_major_show(struct device *dev,
1857                               struct device_attribute *attr, char *buf)
1858 {
1859         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1860
1861         return sprintf(buf, "%d\n", rbd_dev->major);
1862 }
1863
1864 static ssize_t rbd_client_id_show(struct device *dev,
1865                                   struct device_attribute *attr, char *buf)
1866 {
1867         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1868
1869         return sprintf(buf, "client%lld\n",
1870                         ceph_client_id(rbd_dev->rbd_client->client));
1871 }
1872
1873 static ssize_t rbd_pool_show(struct device *dev,
1874                              struct device_attribute *attr, char *buf)
1875 {
1876         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1877
1878         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1879 }
1880
1881 static ssize_t rbd_pool_id_show(struct device *dev,
1882                              struct device_attribute *attr, char *buf)
1883 {
1884         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1885
1886         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1887 }
1888
1889 static ssize_t rbd_name_show(struct device *dev,
1890                              struct device_attribute *attr, char *buf)
1891 {
1892         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1893
1894         return sprintf(buf, "%s\n", rbd_dev->image_name);
1895 }
1896
1897 static ssize_t rbd_image_id_show(struct device *dev,
1898                              struct device_attribute *attr, char *buf)
1899 {
1900         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1901
1902         return sprintf(buf, "%s\n", rbd_dev->image_id);
1903 }
1904
1905 /*
1906  * Shows the name of the currently-mapped snapshot (or
1907  * RBD_SNAP_HEAD_NAME for the base image).
1908  */
1909 static ssize_t rbd_snap_show(struct device *dev,
1910                              struct device_attribute *attr,
1911                              char *buf)
1912 {
1913         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1914
1915         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1916 }
1917
1918 static ssize_t rbd_image_refresh(struct device *dev,
1919                                  struct device_attribute *attr,
1920                                  const char *buf,
1921                                  size_t size)
1922 {
1923         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924         int ret;
1925
1926         ret = rbd_dev_refresh(rbd_dev, NULL);
1927
1928         return ret < 0 ? ret : size;
1929 }
1930
1931 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1932 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1933 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1934 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1935 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1936 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1937 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1938 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1939 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1940 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1941
1942 static struct attribute *rbd_attrs[] = {
1943         &dev_attr_size.attr,
1944         &dev_attr_features.attr,
1945         &dev_attr_major.attr,
1946         &dev_attr_client_id.attr,
1947         &dev_attr_pool.attr,
1948         &dev_attr_pool_id.attr,
1949         &dev_attr_name.attr,
1950         &dev_attr_image_id.attr,
1951         &dev_attr_current_snap.attr,
1952         &dev_attr_refresh.attr,
1953         NULL
1954 };
1955
1956 static struct attribute_group rbd_attr_group = {
1957         .attrs = rbd_attrs,
1958 };
1959
1960 static const struct attribute_group *rbd_attr_groups[] = {
1961         &rbd_attr_group,
1962         NULL
1963 };
1964
1965 static void rbd_sysfs_dev_release(struct device *dev)
1966 {
1967 }
1968
1969 static struct device_type rbd_device_type = {
1970         .name           = "rbd",
1971         .groups         = rbd_attr_groups,
1972         .release        = rbd_sysfs_dev_release,
1973 };
1974
1975
1976 /*
1977   sysfs - snapshots
1978 */
1979
1980 static ssize_t rbd_snap_size_show(struct device *dev,
1981                                   struct device_attribute *attr,
1982                                   char *buf)
1983 {
1984         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1985
1986         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1987 }
1988
1989 static ssize_t rbd_snap_id_show(struct device *dev,
1990                                 struct device_attribute *attr,
1991                                 char *buf)
1992 {
1993         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1994
1995         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1996 }
1997
1998 static ssize_t rbd_snap_features_show(struct device *dev,
1999                                 struct device_attribute *attr,
2000                                 char *buf)
2001 {
2002         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2003
2004         return sprintf(buf, "0x%016llx\n",
2005                         (unsigned long long) snap->features);
2006 }
2007
2008 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2009 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2010 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2011
2012 static struct attribute *rbd_snap_attrs[] = {
2013         &dev_attr_snap_size.attr,
2014         &dev_attr_snap_id.attr,
2015         &dev_attr_snap_features.attr,
2016         NULL,
2017 };
2018
2019 static struct attribute_group rbd_snap_attr_group = {
2020         .attrs = rbd_snap_attrs,
2021 };
2022
2023 static void rbd_snap_dev_release(struct device *dev)
2024 {
2025         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2026         kfree(snap->name);
2027         kfree(snap);
2028 }
2029
2030 static const struct attribute_group *rbd_snap_attr_groups[] = {
2031         &rbd_snap_attr_group,
2032         NULL
2033 };
2034
2035 static struct device_type rbd_snap_device_type = {
2036         .groups         = rbd_snap_attr_groups,
2037         .release        = rbd_snap_dev_release,
2038 };
2039
2040 static bool rbd_snap_registered(struct rbd_snap *snap)
2041 {
2042         bool ret = snap->dev.type == &rbd_snap_device_type;
2043         bool reg = device_is_registered(&snap->dev);
2044
2045         rbd_assert(!ret ^ reg);
2046
2047         return ret;
2048 }
2049
2050 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2051 {
2052         list_del(&snap->node);
2053         if (device_is_registered(&snap->dev))
2054                 device_unregister(&snap->dev);
2055 }
2056
2057 static int rbd_register_snap_dev(struct rbd_snap *snap,
2058                                   struct device *parent)
2059 {
2060         struct device *dev = &snap->dev;
2061         int ret;
2062
2063         dev->type = &rbd_snap_device_type;
2064         dev->parent = parent;
2065         dev->release = rbd_snap_dev_release;
2066         dev_set_name(dev, "snap_%s", snap->name);
2067         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2068
2069         ret = device_register(dev);
2070
2071         return ret;
2072 }
2073
2074 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2075                                                 const char *snap_name,
2076                                                 u64 snap_id, u64 snap_size,
2077                                                 u64 snap_features)
2078 {
2079         struct rbd_snap *snap;
2080         int ret;
2081
2082         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2083         if (!snap)
2084                 return ERR_PTR(-ENOMEM);
2085
2086         ret = -ENOMEM;
2087         snap->name = kstrdup(snap_name, GFP_KERNEL);
2088         if (!snap->name)
2089                 goto err;
2090
2091         snap->id = snap_id;
2092         snap->size = snap_size;
2093         snap->features = snap_features;
2094
2095         return snap;
2096
2097 err:
2098         kfree(snap->name);
2099         kfree(snap);
2100
2101         return ERR_PTR(ret);
2102 }
2103
2104 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2105                 u64 *snap_size, u64 *snap_features)
2106 {
2107         char *snap_name;
2108
2109         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2110
2111         *snap_size = rbd_dev->header.snap_sizes[which];
2112         *snap_features = 0;     /* No features for v1 */
2113
2114         /* Skip over names until we find the one we are looking for */
2115
2116         snap_name = rbd_dev->header.snap_names;
2117         while (which--)
2118                 snap_name += strlen(snap_name) + 1;
2119
2120         return snap_name;
2121 }
2122
2123 /*
2124  * Get the size and object order for an image snapshot, or if
2125  * snap_id is CEPH_NOSNAP, gets this information for the base
2126  * image.
2127  */
2128 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2129                                 u8 *order, u64 *snap_size)
2130 {
2131         __le64 snapid = cpu_to_le64(snap_id);
2132         int ret;
2133         struct {
2134                 u8 order;
2135                 __le64 size;
2136         } __attribute__ ((packed)) size_buf = { 0 };
2137
2138         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2139                                 "rbd", "get_size",
2140                                 (char *) &snapid, sizeof (snapid),
2141                                 (char *) &size_buf, sizeof (size_buf),
2142                                 CEPH_OSD_FLAG_READ, NULL);
2143         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2144         if (ret < 0)
2145                 return ret;
2146
2147         *order = size_buf.order;
2148         *snap_size = le64_to_cpu(size_buf.size);
2149
2150         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2151                 (unsigned long long) snap_id, (unsigned int) *order,
2152                 (unsigned long long) *snap_size);
2153
2154         return 0;
2155 }
2156
2157 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2158 {
2159         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2160                                         &rbd_dev->header.obj_order,
2161                                         &rbd_dev->header.image_size);
2162 }
2163
2164 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2165 {
2166         void *reply_buf;
2167         int ret;
2168         void *p;
2169
2170         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2171         if (!reply_buf)
2172                 return -ENOMEM;
2173
2174         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2175                                 "rbd", "get_object_prefix",
2176                                 NULL, 0,
2177                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2178                                 CEPH_OSD_FLAG_READ, NULL);
2179         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2180         if (ret < 0)
2181                 goto out;
2182         ret = 0;    /* rbd_req_sync_exec() can return positive */
2183
2184         p = reply_buf;
2185         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2186                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2187                                                 NULL, GFP_NOIO);
2188
2189         if (IS_ERR(rbd_dev->header.object_prefix)) {
2190                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2191                 rbd_dev->header.object_prefix = NULL;
2192         } else {
2193                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2194         }
2195
2196 out:
2197         kfree(reply_buf);
2198
2199         return ret;
2200 }
2201
2202 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2203                 u64 *snap_features)
2204 {
2205         __le64 snapid = cpu_to_le64(snap_id);
2206         struct {
2207                 __le64 features;
2208                 __le64 incompat;
2209         } features_buf = { 0 };
2210         u64 incompat;
2211         int ret;
2212
2213         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2214                                 "rbd", "get_features",
2215                                 (char *) &snapid, sizeof (snapid),
2216                                 (char *) &features_buf, sizeof (features_buf),
2217                                 CEPH_OSD_FLAG_READ, NULL);
2218         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2219         if (ret < 0)
2220                 return ret;
2221
2222         incompat = le64_to_cpu(features_buf.incompat);
2223         if (incompat & ~RBD_FEATURES_ALL)
2224                 return -ENOTSUPP;
2225
2226         *snap_features = le64_to_cpu(features_buf.features);
2227
2228         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2229                 (unsigned long long) snap_id,
2230                 (unsigned long long) *snap_features,
2231                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2232
2233         return 0;
2234 }
2235
2236 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2237 {
2238         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2239                                                 &rbd_dev->header.features);
2240 }
2241
2242 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2243 {
2244         size_t size;
2245         int ret;
2246         void *reply_buf;
2247         void *p;
2248         void *end;
2249         u64 seq;
2250         u32 snap_count;
2251         struct ceph_snap_context *snapc;
2252         u32 i;
2253
2254         /*
2255          * We'll need room for the seq value (maximum snapshot id),
2256          * snapshot count, and array of that many snapshot ids.
2257          * For now we have a fixed upper limit on the number we're
2258          * prepared to receive.
2259          */
2260         size = sizeof (__le64) + sizeof (__le32) +
2261                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2262         reply_buf = kzalloc(size, GFP_KERNEL);
2263         if (!reply_buf)
2264                 return -ENOMEM;
2265
2266         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2267                                 "rbd", "get_snapcontext",
2268                                 NULL, 0,
2269                                 reply_buf, size,
2270                                 CEPH_OSD_FLAG_READ, ver);
2271         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2272         if (ret < 0)
2273                 goto out;
2274
2275         ret = -ERANGE;
2276         p = reply_buf;
2277         end = (char *) reply_buf + size;
2278         ceph_decode_64_safe(&p, end, seq, out);
2279         ceph_decode_32_safe(&p, end, snap_count, out);
2280
2281         /*
2282          * Make sure the reported number of snapshot ids wouldn't go
2283          * beyond the end of our buffer.  But before checking that,
2284          * make sure the computed size of the snapshot context we
2285          * allocate is representable in a size_t.
2286          */
2287         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2288                                  / sizeof (u64)) {
2289                 ret = -EINVAL;
2290                 goto out;
2291         }
2292         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2293                 goto out;
2294
2295         size = sizeof (struct ceph_snap_context) +
2296                                 snap_count * sizeof (snapc->snaps[0]);
2297         snapc = kmalloc(size, GFP_KERNEL);
2298         if (!snapc) {
2299                 ret = -ENOMEM;
2300                 goto out;
2301         }
2302
2303         atomic_set(&snapc->nref, 1);
2304         snapc->seq = seq;
2305         snapc->num_snaps = snap_count;
2306         for (i = 0; i < snap_count; i++)
2307                 snapc->snaps[i] = ceph_decode_64(&p);
2308
2309         rbd_dev->header.snapc = snapc;
2310
2311         dout("  snap context seq = %llu, snap_count = %u\n",
2312                 (unsigned long long) seq, (unsigned int) snap_count);
2313
2314 out:
2315         kfree(reply_buf);
2316
2317         return 0;
2318 }
2319
2320 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2321 {
2322         size_t size;
2323         void *reply_buf;
2324         __le64 snap_id;
2325         int ret;
2326         void *p;
2327         void *end;
2328         size_t snap_name_len;
2329         char *snap_name;
2330
2331         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2332         reply_buf = kmalloc(size, GFP_KERNEL);
2333         if (!reply_buf)
2334                 return ERR_PTR(-ENOMEM);
2335
2336         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2337         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2338                                 "rbd", "get_snapshot_name",
2339                                 (char *) &snap_id, sizeof (snap_id),
2340                                 reply_buf, size,
2341                                 CEPH_OSD_FLAG_READ, NULL);
2342         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2343         if (ret < 0)
2344                 goto out;
2345
2346         p = reply_buf;
2347         end = (char *) reply_buf + size;
2348         snap_name_len = 0;
2349         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2350                                 GFP_KERNEL);
2351         if (IS_ERR(snap_name)) {
2352                 ret = PTR_ERR(snap_name);
2353                 goto out;
2354         } else {
2355                 dout("  snap_id 0x%016llx snap_name = %s\n",
2356                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2357         }
2358         kfree(reply_buf);
2359
2360         return snap_name;
2361 out:
2362         kfree(reply_buf);
2363
2364         return ERR_PTR(ret);
2365 }
2366
2367 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2368                 u64 *snap_size, u64 *snap_features)
2369 {
2370         __le64 snap_id;
2371         u8 order;
2372         int ret;
2373
2374         snap_id = rbd_dev->header.snapc->snaps[which];
2375         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2376         if (ret)
2377                 return ERR_PTR(ret);
2378         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2379         if (ret)
2380                 return ERR_PTR(ret);
2381
2382         return rbd_dev_v2_snap_name(rbd_dev, which);
2383 }
2384
2385 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2386                 u64 *snap_size, u64 *snap_features)
2387 {
2388         if (rbd_dev->image_format == 1)
2389                 return rbd_dev_v1_snap_info(rbd_dev, which,
2390                                         snap_size, snap_features);
2391         if (rbd_dev->image_format == 2)
2392                 return rbd_dev_v2_snap_info(rbd_dev, which,
2393                                         snap_size, snap_features);
2394         return ERR_PTR(-EINVAL);
2395 }
2396
2397 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2398 {
2399         int ret;
2400         __u8 obj_order;
2401
2402         down_write(&rbd_dev->header_rwsem);
2403
2404         /* Grab old order first, to see if it changes */
2405
2406         obj_order = rbd_dev->header.obj_order,
2407         ret = rbd_dev_v2_image_size(rbd_dev);
2408         if (ret)
2409                 goto out;
2410         if (rbd_dev->header.obj_order != obj_order) {
2411                 ret = -EIO;
2412                 goto out;
2413         }
2414         rbd_update_mapping_size(rbd_dev);
2415
2416         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2417         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2418         if (ret)
2419                 goto out;
2420         ret = rbd_dev_snaps_update(rbd_dev);
2421         dout("rbd_dev_snaps_update returned %d\n", ret);
2422         if (ret)
2423                 goto out;
2424         ret = rbd_dev_snaps_register(rbd_dev);
2425         dout("rbd_dev_snaps_register returned %d\n", ret);
2426 out:
2427         up_write(&rbd_dev->header_rwsem);
2428
2429         return ret;
2430 }
2431
2432 /*
2433  * Scan the rbd device's current snapshot list and compare it to the
2434  * newly-received snapshot context.  Remove any existing snapshots
2435  * not present in the new snapshot context.  Add a new snapshot for
2436  * any snaphots in the snapshot context not in the current list.
2437  * And verify there are no changes to snapshots we already know
2438  * about.
2439  *
2440  * Assumes the snapshots in the snapshot context are sorted by
2441  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2442  * are also maintained in that order.)
2443  */
2444 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2445 {
2446         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2447         const u32 snap_count = snapc->num_snaps;
2448         struct list_head *head = &rbd_dev->snaps;
2449         struct list_head *links = head->next;
2450         u32 index = 0;
2451
2452         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2453         while (index < snap_count || links != head) {
2454                 u64 snap_id;
2455                 struct rbd_snap *snap;
2456                 char *snap_name;
2457                 u64 snap_size = 0;
2458                 u64 snap_features = 0;
2459
2460                 snap_id = index < snap_count ? snapc->snaps[index]
2461                                              : CEPH_NOSNAP;
2462                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2463                                      : NULL;
2464                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2465
2466                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2467                         struct list_head *next = links->next;
2468
2469                         /* Existing snapshot not in the new snap context */
2470
2471                         if (rbd_dev->mapping.snap_id == snap->id)
2472                                 rbd_dev->mapping.snap_exists = false;
2473                         __rbd_remove_snap_dev(snap);
2474                         dout("%ssnap id %llu has been removed\n",
2475                                 rbd_dev->mapping.snap_id == snap->id ?
2476                                                                 "mapped " : "",
2477                                 (unsigned long long) snap->id);
2478
2479                         /* Done with this list entry; advance */
2480
2481                         links = next;
2482                         continue;
2483                 }
2484
2485                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2486                                         &snap_size, &snap_features);
2487                 if (IS_ERR(snap_name))
2488                         return PTR_ERR(snap_name);
2489
2490                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2491                         (unsigned long long) snap_id);
2492                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2493                         struct rbd_snap *new_snap;
2494
2495                         /* We haven't seen this snapshot before */
2496
2497                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2498                                         snap_id, snap_size, snap_features);
2499                         if (IS_ERR(new_snap)) {
2500                                 int err = PTR_ERR(new_snap);
2501
2502                                 dout("  failed to add dev, error %d\n", err);
2503
2504                                 return err;
2505                         }
2506
2507                         /* New goes before existing, or at end of list */
2508
2509                         dout("  added dev%s\n", snap ? "" : " at end\n");
2510                         if (snap)
2511                                 list_add_tail(&new_snap->node, &snap->node);
2512                         else
2513                                 list_add_tail(&new_snap->node, head);
2514                 } else {
2515                         /* Already have this one */
2516
2517                         dout("  already present\n");
2518
2519                         rbd_assert(snap->size == snap_size);
2520                         rbd_assert(!strcmp(snap->name, snap_name));
2521                         rbd_assert(snap->features == snap_features);
2522
2523                         /* Done with this list entry; advance */
2524
2525                         links = links->next;
2526                 }
2527
2528                 /* Advance to the next entry in the snapshot context */
2529
2530                 index++;
2531         }
2532         dout("%s: done\n", __func__);
2533
2534         return 0;
2535 }
2536
2537 /*
2538  * Scan the list of snapshots and register the devices for any that
2539  * have not already been registered.
2540  */
2541 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2542 {
2543         struct rbd_snap *snap;
2544         int ret = 0;
2545
2546         dout("%s called\n", __func__);
2547         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2548                 return -EIO;
2549
2550         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2551                 if (!rbd_snap_registered(snap)) {
2552                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2553                         if (ret < 0)
2554                                 break;
2555                 }
2556         }
2557         dout("%s: returning %d\n", __func__, ret);
2558
2559         return ret;
2560 }
2561
2562 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2563 {
2564         struct device *dev;
2565         int ret;
2566
2567         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2568
2569         dev = &rbd_dev->dev;
2570         dev->bus = &rbd_bus_type;
2571         dev->type = &rbd_device_type;
2572         dev->parent = &rbd_root_dev;
2573         dev->release = rbd_dev_release;
2574         dev_set_name(dev, "%d", rbd_dev->dev_id);
2575         ret = device_register(dev);
2576
2577         mutex_unlock(&ctl_mutex);
2578
2579         return ret;
2580 }
2581
2582 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2583 {
2584         device_unregister(&rbd_dev->dev);
2585 }
2586
2587 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2588 {
2589         int ret, rc;
2590
2591         do {
2592                 ret = rbd_req_sync_watch(rbd_dev);
2593                 if (ret == -ERANGE) {
2594                         rc = rbd_dev_refresh(rbd_dev, NULL);
2595                         if (rc < 0)
2596                                 return rc;
2597                 }
2598         } while (ret == -ERANGE);
2599
2600         return ret;
2601 }
2602
2603 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2604
2605 /*
2606  * Get a unique rbd identifier for the given new rbd_dev, and add
2607  * the rbd_dev to the global list.  The minimum rbd id is 1.
2608  */
2609 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2610 {
2611         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2612
2613         spin_lock(&rbd_dev_list_lock);
2614         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2615         spin_unlock(&rbd_dev_list_lock);
2616         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2617                 (unsigned long long) rbd_dev->dev_id);
2618 }
2619
2620 /*
2621  * Remove an rbd_dev from the global list, and record that its
2622  * identifier is no longer in use.
2623  */
2624 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2625 {
2626         struct list_head *tmp;
2627         int rbd_id = rbd_dev->dev_id;
2628         int max_id;
2629
2630         rbd_assert(rbd_id > 0);
2631
2632         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2633                 (unsigned long long) rbd_dev->dev_id);
2634         spin_lock(&rbd_dev_list_lock);
2635         list_del_init(&rbd_dev->node);
2636
2637         /*
2638          * If the id being "put" is not the current maximum, there
2639          * is nothing special we need to do.
2640          */
2641         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2642                 spin_unlock(&rbd_dev_list_lock);
2643                 return;
2644         }
2645
2646         /*
2647          * We need to update the current maximum id.  Search the
2648          * list to find out what it is.  We're more likely to find
2649          * the maximum at the end, so search the list backward.
2650          */
2651         max_id = 0;
2652         list_for_each_prev(tmp, &rbd_dev_list) {
2653                 struct rbd_device *rbd_dev;
2654
2655                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2656                 if (rbd_dev->dev_id > max_id)
2657                         max_id = rbd_dev->dev_id;
2658         }
2659         spin_unlock(&rbd_dev_list_lock);
2660
2661         /*
2662          * The max id could have been updated by rbd_dev_id_get(), in
2663          * which case it now accurately reflects the new maximum.
2664          * Be careful not to overwrite the maximum value in that
2665          * case.
2666          */
2667         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2668         dout("  max dev id has been reset\n");
2669 }
2670
2671 /*
2672  * Skips over white space at *buf, and updates *buf to point to the
2673  * first found non-space character (if any). Returns the length of
2674  * the token (string of non-white space characters) found.  Note
2675  * that *buf must be terminated with '\0'.
2676  */
2677 static inline size_t next_token(const char **buf)
2678 {
2679         /*
2680         * These are the characters that produce nonzero for
2681         * isspace() in the "C" and "POSIX" locales.
2682         */
2683         const char *spaces = " \f\n\r\t\v";
2684
2685         *buf += strspn(*buf, spaces);   /* Find start of token */
2686
2687         return strcspn(*buf, spaces);   /* Return token length */
2688 }
2689
2690 /*
2691  * Finds the next token in *buf, and if the provided token buffer is
2692  * big enough, copies the found token into it.  The result, if
2693  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2694  * must be terminated with '\0' on entry.
2695  *
2696  * Returns the length of the token found (not including the '\0').
2697  * Return value will be 0 if no token is found, and it will be >=
2698  * token_size if the token would not fit.
2699  *
2700  * The *buf pointer will be updated to point beyond the end of the
2701  * found token.  Note that this occurs even if the token buffer is
2702  * too small to hold it.
2703  */
2704 static inline size_t copy_token(const char **buf,
2705                                 char *token,
2706                                 size_t token_size)
2707 {
2708         size_t len;
2709
2710         len = next_token(buf);
2711         if (len < token_size) {
2712                 memcpy(token, *buf, len);
2713                 *(token + len) = '\0';
2714         }
2715         *buf += len;
2716
2717         return len;
2718 }
2719
2720 /*
2721  * Finds the next token in *buf, dynamically allocates a buffer big
2722  * enough to hold a copy of it, and copies the token into the new
2723  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2724  * that a duplicate buffer is created even for a zero-length token.
2725  *
2726  * Returns a pointer to the newly-allocated duplicate, or a null
2727  * pointer if memory for the duplicate was not available.  If
2728  * the lenp argument is a non-null pointer, the length of the token
2729  * (not including the '\0') is returned in *lenp.
2730  *
2731  * If successful, the *buf pointer will be updated to point beyond
2732  * the end of the found token.
2733  *
2734  * Note: uses GFP_KERNEL for allocation.
2735  */
2736 static inline char *dup_token(const char **buf, size_t *lenp)
2737 {
2738         char *dup;
2739         size_t len;
2740
2741         len = next_token(buf);
2742         dup = kmalloc(len + 1, GFP_KERNEL);
2743         if (!dup)
2744                 return NULL;
2745
2746         memcpy(dup, *buf, len);
2747         *(dup + len) = '\0';
2748         *buf += len;
2749
2750         if (lenp)
2751                 *lenp = len;
2752
2753         return dup;
2754 }
2755
2756 /*
2757  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2758  * rbd_md_name, and name fields of the given rbd_dev, based on the
2759  * list of monitor addresses and other options provided via
2760  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2761  * copy of the snapshot name to map if successful, or a
2762  * pointer-coded error otherwise.
2763  *
2764  * Note: rbd_dev is assumed to have been initially zero-filled.
2765  */
2766 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2767                                 const char *buf,
2768                                 const char **mon_addrs,
2769                                 size_t *mon_addrs_size,
2770                                 char *options,
2771                                 size_t options_size)
2772 {
2773         size_t len;
2774         char *err_ptr = ERR_PTR(-EINVAL);
2775         char *snap_name;
2776
2777         /* The first four tokens are required */
2778
2779         len = next_token(&buf);
2780         if (!len)
2781                 return err_ptr;
2782         *mon_addrs_size = len + 1;
2783         *mon_addrs = buf;
2784
2785         buf += len;
2786
2787         len = copy_token(&buf, options, options_size);
2788         if (!len || len >= options_size)
2789                 return err_ptr;
2790
2791         err_ptr = ERR_PTR(-ENOMEM);
2792         rbd_dev->pool_name = dup_token(&buf, NULL);
2793         if (!rbd_dev->pool_name)
2794                 goto out_err;
2795
2796         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2797         if (!rbd_dev->image_name)
2798                 goto out_err;
2799
2800         /* Snapshot name is optional */
2801         len = next_token(&buf);
2802         if (!len) {
2803                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2804                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2805         }
2806         snap_name = kmalloc(len + 1, GFP_KERNEL);
2807         if (!snap_name)
2808                 goto out_err;
2809         memcpy(snap_name, buf, len);
2810         *(snap_name + len) = '\0';
2811
2812 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2813
2814         return snap_name;
2815
2816 out_err:
2817         kfree(rbd_dev->image_name);
2818         rbd_dev->image_name = NULL;
2819         rbd_dev->image_name_len = 0;
2820         kfree(rbd_dev->pool_name);
2821         rbd_dev->pool_name = NULL;
2822
2823         return err_ptr;
2824 }
2825
2826 /*
2827  * An rbd format 2 image has a unique identifier, distinct from the
2828  * name given to it by the user.  Internally, that identifier is
2829  * what's used to specify the names of objects related to the image.
2830  *
2831  * A special "rbd id" object is used to map an rbd image name to its
2832  * id.  If that object doesn't exist, then there is no v2 rbd image
2833  * with the supplied name.
2834  *
2835  * This function will record the given rbd_dev's image_id field if
2836  * it can be determined, and in that case will return 0.  If any
2837  * errors occur a negative errno will be returned and the rbd_dev's
2838  * image_id field will be unchanged (and should be NULL).
2839  */
2840 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2841 {
2842         int ret;
2843         size_t size;
2844         char *object_name;
2845         void *response;
2846         void *p;
2847
2848         /*
2849          * First, see if the format 2 image id file exists, and if
2850          * so, get the image's persistent id from it.
2851          */
2852         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2853         object_name = kmalloc(size, GFP_NOIO);
2854         if (!object_name)
2855                 return -ENOMEM;
2856         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2857         dout("rbd id object name is %s\n", object_name);
2858
2859         /* Response will be an encoded string, which includes a length */
2860
2861         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2862         response = kzalloc(size, GFP_NOIO);
2863         if (!response) {
2864                 ret = -ENOMEM;
2865                 goto out;
2866         }
2867
2868         ret = rbd_req_sync_exec(rbd_dev, object_name,
2869                                 "rbd", "get_id",
2870                                 NULL, 0,
2871                                 response, RBD_IMAGE_ID_LEN_MAX,
2872                                 CEPH_OSD_FLAG_READ, NULL);
2873         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2874         if (ret < 0)
2875                 goto out;
2876         ret = 0;    /* rbd_req_sync_exec() can return positive */
2877
2878         p = response;
2879         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2880                                                 p + RBD_IMAGE_ID_LEN_MAX,
2881                                                 &rbd_dev->image_id_len,
2882                                                 GFP_NOIO);
2883         if (IS_ERR(rbd_dev->image_id)) {
2884                 ret = PTR_ERR(rbd_dev->image_id);
2885                 rbd_dev->image_id = NULL;
2886         } else {
2887                 dout("image_id is %s\n", rbd_dev->image_id);
2888         }
2889 out:
2890         kfree(response);
2891         kfree(object_name);
2892
2893         return ret;
2894 }
2895
2896 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2897 {
2898         int ret;
2899         size_t size;
2900
2901         /* Version 1 images have no id; empty string is used */
2902
2903         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2904         if (!rbd_dev->image_id)
2905                 return -ENOMEM;
2906         rbd_dev->image_id_len = 0;
2907
2908         /* Record the header object name for this rbd image. */
2909
2910         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2911         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2912         if (!rbd_dev->header_name) {
2913                 ret = -ENOMEM;
2914                 goto out_err;
2915         }
2916         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2917
2918         /* Populate rbd image metadata */
2919
2920         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2921         if (ret < 0)
2922                 goto out_err;
2923         rbd_dev->image_format = 1;
2924
2925         dout("discovered version 1 image, header name is %s\n",
2926                 rbd_dev->header_name);
2927
2928         return 0;
2929
2930 out_err:
2931         kfree(rbd_dev->header_name);
2932         rbd_dev->header_name = NULL;
2933         kfree(rbd_dev->image_id);
2934         rbd_dev->image_id = NULL;
2935
2936         return ret;
2937 }
2938
2939 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2940 {
2941         size_t size;
2942         int ret;
2943         u64 ver = 0;
2944
2945         /*
2946          * Image id was filled in by the caller.  Record the header
2947          * object name for this rbd image.
2948          */
2949         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2950         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2951         if (!rbd_dev->header_name)
2952                 return -ENOMEM;
2953         sprintf(rbd_dev->header_name, "%s%s",
2954                         RBD_HEADER_PREFIX, rbd_dev->image_id);
2955
2956         /* Get the size and object order for the image */
2957
2958         ret = rbd_dev_v2_image_size(rbd_dev);
2959         if (ret < 0)
2960                 goto out_err;
2961
2962         /* Get the object prefix (a.k.a. block_name) for the image */
2963
2964         ret = rbd_dev_v2_object_prefix(rbd_dev);
2965         if (ret < 0)
2966                 goto out_err;
2967
2968         /* Get the and check features for the image */
2969
2970         ret = rbd_dev_v2_features(rbd_dev);
2971         if (ret < 0)
2972                 goto out_err;
2973
2974         /* crypto and compression type aren't (yet) supported for v2 images */
2975
2976         rbd_dev->header.crypt_type = 0;
2977         rbd_dev->header.comp_type = 0;
2978
2979         /* Get the snapshot context, plus the header version */
2980
2981         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2982         if (ret)
2983                 goto out_err;
2984         rbd_dev->header.obj_version = ver;
2985
2986         rbd_dev->image_format = 2;
2987
2988         dout("discovered version 2 image, header name is %s\n",
2989                 rbd_dev->header_name);
2990
2991         return 0;
2992 out_err:
2993         kfree(rbd_dev->header_name);
2994         rbd_dev->header_name = NULL;
2995         kfree(rbd_dev->header.object_prefix);
2996         rbd_dev->header.object_prefix = NULL;
2997
2998         return ret;
2999 }
3000
3001 /*
3002  * Probe for the existence of the header object for the given rbd
3003  * device.  For format 2 images this includes determining the image
3004  * id.
3005  */
3006 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3007 {
3008         int ret;
3009
3010         /*
3011          * Get the id from the image id object.  If it's not a
3012          * format 2 image, we'll get ENOENT back, and we'll assume
3013          * it's a format 1 image.
3014          */
3015         ret = rbd_dev_image_id(rbd_dev);
3016         if (ret)
3017                 ret = rbd_dev_v1_probe(rbd_dev);
3018         else
3019                 ret = rbd_dev_v2_probe(rbd_dev);
3020         if (ret)
3021                 dout("probe failed, returning %d\n", ret);
3022
3023         return ret;
3024 }
3025
3026 static ssize_t rbd_add(struct bus_type *bus,
3027                        const char *buf,
3028                        size_t count)
3029 {
3030         char *options;
3031         struct rbd_device *rbd_dev = NULL;
3032         const char *mon_addrs = NULL;
3033         size_t mon_addrs_size = 0;
3034         struct ceph_osd_client *osdc;
3035         int rc = -ENOMEM;
3036         char *snap_name;
3037
3038         if (!try_module_get(THIS_MODULE))
3039                 return -ENODEV;
3040
3041         options = kmalloc(count, GFP_KERNEL);
3042         if (!options)
3043                 goto err_out_mem;
3044         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3045         if (!rbd_dev)
3046                 goto err_out_mem;
3047
3048         /* static rbd_device initialization */
3049         spin_lock_init(&rbd_dev->lock);
3050         INIT_LIST_HEAD(&rbd_dev->node);
3051         INIT_LIST_HEAD(&rbd_dev->snaps);
3052         init_rwsem(&rbd_dev->header_rwsem);
3053
3054         /* parse add command */
3055         snap_name = rbd_add_parse_args(rbd_dev, buf,
3056                                 &mon_addrs, &mon_addrs_size, options, count);
3057         if (IS_ERR(snap_name)) {
3058                 rc = PTR_ERR(snap_name);
3059                 goto err_out_mem;
3060         }
3061
3062         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3063         if (rc < 0)
3064                 goto err_out_args;
3065
3066         /* pick the pool */
3067         osdc = &rbd_dev->rbd_client->client->osdc;
3068         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3069         if (rc < 0)
3070                 goto err_out_client;
3071         rbd_dev->pool_id = rc;
3072
3073         rc = rbd_dev_probe(rbd_dev);
3074         if (rc < 0)
3075                 goto err_out_client;
3076
3077         /* no need to lock here, as rbd_dev is not registered yet */
3078         rc = rbd_dev_snaps_update(rbd_dev);
3079         if (rc)
3080                 goto err_out_header;
3081
3082         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3083         if (rc)
3084                 goto err_out_header;
3085
3086         /* generate unique id: find highest unique id, add one */
3087         rbd_dev_id_get(rbd_dev);
3088
3089         /* Fill in the device name, now that we have its id. */
3090         BUILD_BUG_ON(DEV_NAME_LEN
3091                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3092         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3093
3094         /* Get our block major device number. */
3095
3096         rc = register_blkdev(0, rbd_dev->name);
3097         if (rc < 0)
3098                 goto err_out_id;
3099         rbd_dev->major = rc;
3100
3101         /* Set up the blkdev mapping. */
3102
3103         rc = rbd_init_disk(rbd_dev);
3104         if (rc)
3105                 goto err_out_blkdev;
3106
3107         rc = rbd_bus_add_dev(rbd_dev);
3108         if (rc)
3109                 goto err_out_disk;
3110
3111         /*
3112          * At this point cleanup in the event of an error is the job
3113          * of the sysfs code (initiated by rbd_bus_del_dev()).
3114          */
3115
3116         down_write(&rbd_dev->header_rwsem);
3117         rc = rbd_dev_snaps_register(rbd_dev);
3118         up_write(&rbd_dev->header_rwsem);
3119         if (rc)
3120                 goto err_out_bus;
3121
3122         rc = rbd_init_watch_dev(rbd_dev);
3123         if (rc)
3124                 goto err_out_bus;
3125
3126         /* Everything's ready.  Announce the disk to the world. */
3127
3128         add_disk(rbd_dev->disk);
3129
3130         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3131                 (unsigned long long) rbd_dev->mapping.size);
3132
3133         return count;
3134
3135 err_out_bus:
3136         /* this will also clean up rest of rbd_dev stuff */
3137
3138         rbd_bus_del_dev(rbd_dev);
3139         kfree(options);
3140         return rc;
3141
3142 err_out_disk:
3143         rbd_free_disk(rbd_dev);
3144 err_out_blkdev:
3145         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3146 err_out_id:
3147         rbd_dev_id_put(rbd_dev);
3148 err_out_header:
3149         rbd_header_free(&rbd_dev->header);
3150 err_out_client:
3151         kfree(rbd_dev->header_name);
3152         rbd_put_client(rbd_dev);
3153         kfree(rbd_dev->image_id);
3154 err_out_args:
3155         kfree(rbd_dev->mapping.snap_name);
3156         kfree(rbd_dev->image_name);
3157         kfree(rbd_dev->pool_name);
3158 err_out_mem:
3159         kfree(rbd_dev);
3160         kfree(options);
3161
3162         dout("Error adding device %s\n", buf);
3163         module_put(THIS_MODULE);
3164
3165         return (ssize_t) rc;
3166 }
3167
3168 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3169 {
3170         struct list_head *tmp;
3171         struct rbd_device *rbd_dev;
3172
3173         spin_lock(&rbd_dev_list_lock);
3174         list_for_each(tmp, &rbd_dev_list) {
3175                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3176                 if (rbd_dev->dev_id == dev_id) {
3177                         spin_unlock(&rbd_dev_list_lock);
3178                         return rbd_dev;
3179                 }
3180         }
3181         spin_unlock(&rbd_dev_list_lock);
3182         return NULL;
3183 }
3184
3185 static void rbd_dev_release(struct device *dev)
3186 {
3187         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3188
3189         if (rbd_dev->watch_request) {
3190                 struct ceph_client *client = rbd_dev->rbd_client->client;
3191
3192                 ceph_osdc_unregister_linger_request(&client->osdc,
3193                                                     rbd_dev->watch_request);
3194         }
3195         if (rbd_dev->watch_event)
3196                 rbd_req_sync_unwatch(rbd_dev);
3197
3198         rbd_put_client(rbd_dev);
3199
3200         /* clean up and free blkdev */
3201         rbd_free_disk(rbd_dev);
3202         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3203
3204         /* release allocated disk header fields */
3205         rbd_header_free(&rbd_dev->header);
3206
3207         /* done with the id, and with the rbd_dev */
3208         kfree(rbd_dev->mapping.snap_name);
3209         kfree(rbd_dev->image_id);
3210         kfree(rbd_dev->header_name);
3211         kfree(rbd_dev->pool_name);
3212         kfree(rbd_dev->image_name);
3213         rbd_dev_id_put(rbd_dev);
3214         kfree(rbd_dev);
3215
3216         /* release module ref */
3217         module_put(THIS_MODULE);
3218 }
3219
3220 static ssize_t rbd_remove(struct bus_type *bus,
3221                           const char *buf,
3222                           size_t count)
3223 {
3224         struct rbd_device *rbd_dev = NULL;
3225         int target_id, rc;
3226         unsigned long ul;
3227         int ret = count;
3228
3229         rc = strict_strtoul(buf, 10, &ul);
3230         if (rc)
3231                 return rc;
3232
3233         /* convert to int; abort if we lost anything in the conversion */
3234         target_id = (int) ul;
3235         if (target_id != ul)
3236                 return -EINVAL;
3237
3238         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3239
3240         rbd_dev = __rbd_get_dev(target_id);
3241         if (!rbd_dev) {
3242                 ret = -ENOENT;
3243                 goto done;
3244         }
3245
3246         __rbd_remove_all_snaps(rbd_dev);
3247         rbd_bus_del_dev(rbd_dev);
3248
3249 done:
3250         mutex_unlock(&ctl_mutex);
3251
3252         return ret;
3253 }
3254
3255 /*
3256  * create control files in sysfs
3257  * /sys/bus/rbd/...
3258  */
3259 static int rbd_sysfs_init(void)
3260 {
3261         int ret;
3262
3263         ret = device_register(&rbd_root_dev);
3264         if (ret < 0)
3265                 return ret;
3266
3267         ret = bus_register(&rbd_bus_type);
3268         if (ret < 0)
3269                 device_unregister(&rbd_root_dev);
3270
3271         return ret;
3272 }
3273
3274 static void rbd_sysfs_cleanup(void)
3275 {
3276         bus_unregister(&rbd_bus_type);
3277         device_unregister(&rbd_root_dev);
3278 }
3279
3280 int __init rbd_init(void)
3281 {
3282         int rc;
3283
3284         rc = rbd_sysfs_init();
3285         if (rc)
3286                 return rc;
3287         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3288         return 0;
3289 }
3290
3291 void __exit rbd_exit(void)
3292 {
3293         rbd_sysfs_cleanup();
3294 }
3295
3296 module_init(rbd_init);
3297 module_exit(rbd_exit);
3298
3299 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3300 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3301 MODULE_DESCRIPTION("rados block device");
3302
3303 /* following authorship retained from original osdblk.c */
3304 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3305
3306 MODULE_LICENSE("GPL");