]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
a528d4ca7a67619def59a62cb5153497bb7ac57f
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 /*
116  * An rbd image specification.
117  *
118  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
119  * identify an image.
120  */
121 struct rbd_spec {
122         u64             pool_id;
123         char            *pool_name;
124
125         char            *image_id;
126         size_t          image_id_len;
127         char            *image_name;
128         size_t          image_name_len;
129
130         u64             snap_id;
131         char            *snap_name;
132
133         struct kref     kref;
134 };
135
136 struct rbd_options {
137         bool    read_only;
138 };
139
140 /*
141  * an instance of the client.  multiple devices may share an rbd client.
142  */
143 struct rbd_client {
144         struct ceph_client      *client;
145         struct kref             kref;
146         struct list_head        node;
147 };
148
149 /*
150  * a request completion status
151  */
152 struct rbd_req_status {
153         int done;
154         int rc;
155         u64 bytes;
156 };
157
158 /*
159  * a collection of requests
160  */
161 struct rbd_req_coll {
162         int                     total;
163         int                     num_done;
164         struct kref             kref;
165         struct rbd_req_status   status[0];
166 };
167
168 /*
169  * a single io request
170  */
171 struct rbd_request {
172         struct request          *rq;            /* blk layer request */
173         struct bio              *bio;           /* cloned bio */
174         struct page             **pages;        /* list of used pages */
175         u64                     len;
176         int                     coll_index;
177         struct rbd_req_coll     *coll;
178 };
179
180 struct rbd_snap {
181         struct  device          dev;
182         const char              *name;
183         u64                     size;
184         struct list_head        node;
185         u64                     id;
186         u64                     features;
187 };
188
189 struct rbd_mapping {
190         u64                     size;
191         u64                     features;
192         bool                    read_only;
193 };
194
195 /*
196  * a single device
197  */
198 struct rbd_device {
199         int                     dev_id;         /* blkdev unique id */
200
201         int                     major;          /* blkdev assigned major */
202         struct gendisk          *disk;          /* blkdev's gendisk and rq */
203
204         u32                     image_format;   /* Either 1 or 2 */
205         struct rbd_client       *rbd_client;
206
207         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
208
209         spinlock_t              lock;           /* queue lock */
210
211         struct rbd_image_header header;
212         bool                    exists;
213         struct rbd_spec         *spec;
214
215         char                    *header_name;
216
217         struct ceph_osd_event   *watch_event;
218         struct ceph_osd_request *watch_request;
219
220         /* protects updating the header */
221         struct rw_semaphore     header_rwsem;
222
223         struct rbd_mapping      mapping;
224
225         struct list_head        node;
226
227         /* list of snapshots */
228         struct list_head        snaps;
229
230         /* sysfs related */
231         struct device           dev;
232 };
233
234 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
235
236 static LIST_HEAD(rbd_dev_list);    /* devices */
237 static DEFINE_SPINLOCK(rbd_dev_list_lock);
238
239 static LIST_HEAD(rbd_client_list);              /* clients */
240 static DEFINE_SPINLOCK(rbd_client_list_lock);
241
242 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
243 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
244
245 static void rbd_dev_release(struct device *dev);
246 static void rbd_remove_snap_dev(struct rbd_snap *snap);
247
248 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
249                        size_t count);
250 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
251                           size_t count);
252
253 static struct bus_attribute rbd_bus_attrs[] = {
254         __ATTR(add, S_IWUSR, NULL, rbd_add),
255         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
256         __ATTR_NULL
257 };
258
259 static struct bus_type rbd_bus_type = {
260         .name           = "rbd",
261         .bus_attrs      = rbd_bus_attrs,
262 };
263
264 static void rbd_root_dev_release(struct device *dev)
265 {
266 }
267
268 static struct device rbd_root_dev = {
269         .init_name =    "rbd",
270         .release =      rbd_root_dev_release,
271 };
272
273 #ifdef RBD_DEBUG
274 #define rbd_assert(expr)                                                \
275                 if (unlikely(!(expr))) {                                \
276                         printk(KERN_ERR "\nAssertion failure in %s() "  \
277                                                 "at line %d:\n\n"       \
278                                         "\trbd_assert(%s);\n\n",        \
279                                         __func__, __LINE__, #expr);     \
280                         BUG();                                          \
281                 }
282 #else /* !RBD_DEBUG */
283 #  define rbd_assert(expr)      ((void) 0)
284 #endif /* !RBD_DEBUG */
285
286 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
287 {
288         return get_device(&rbd_dev->dev);
289 }
290
291 static void rbd_put_dev(struct rbd_device *rbd_dev)
292 {
293         put_device(&rbd_dev->dev);
294 }
295
296 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
297 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
298
299 static int rbd_open(struct block_device *bdev, fmode_t mode)
300 {
301         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
302
303         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
304                 return -EROFS;
305
306         rbd_get_dev(rbd_dev);
307         set_device_ro(bdev, rbd_dev->mapping.read_only);
308
309         return 0;
310 }
311
312 static int rbd_release(struct gendisk *disk, fmode_t mode)
313 {
314         struct rbd_device *rbd_dev = disk->private_data;
315
316         rbd_put_dev(rbd_dev);
317
318         return 0;
319 }
320
321 static const struct block_device_operations rbd_bd_ops = {
322         .owner                  = THIS_MODULE,
323         .open                   = rbd_open,
324         .release                = rbd_release,
325 };
326
327 /*
328  * Initialize an rbd client instance.
329  * We own *ceph_opts.
330  */
331 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
332 {
333         struct rbd_client *rbdc;
334         int ret = -ENOMEM;
335
336         dout("rbd_client_create\n");
337         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
338         if (!rbdc)
339                 goto out_opt;
340
341         kref_init(&rbdc->kref);
342         INIT_LIST_HEAD(&rbdc->node);
343
344         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
345
346         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
347         if (IS_ERR(rbdc->client))
348                 goto out_mutex;
349         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
350
351         ret = ceph_open_session(rbdc->client);
352         if (ret < 0)
353                 goto out_err;
354
355         spin_lock(&rbd_client_list_lock);
356         list_add_tail(&rbdc->node, &rbd_client_list);
357         spin_unlock(&rbd_client_list_lock);
358
359         mutex_unlock(&ctl_mutex);
360
361         dout("rbd_client_create created %p\n", rbdc);
362         return rbdc;
363
364 out_err:
365         ceph_destroy_client(rbdc->client);
366 out_mutex:
367         mutex_unlock(&ctl_mutex);
368         kfree(rbdc);
369 out_opt:
370         if (ceph_opts)
371                 ceph_destroy_options(ceph_opts);
372         return ERR_PTR(ret);
373 }
374
375 /*
376  * Find a ceph client with specific addr and configuration.  If
377  * found, bump its reference count.
378  */
379 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
380 {
381         struct rbd_client *client_node;
382         bool found = false;
383
384         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
385                 return NULL;
386
387         spin_lock(&rbd_client_list_lock);
388         list_for_each_entry(client_node, &rbd_client_list, node) {
389                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
390                         kref_get(&client_node->kref);
391                         found = true;
392                         break;
393                 }
394         }
395         spin_unlock(&rbd_client_list_lock);
396
397         return found ? client_node : NULL;
398 }
399
400 /*
401  * mount options
402  */
403 enum {
404         Opt_last_int,
405         /* int args above */
406         Opt_last_string,
407         /* string args above */
408         Opt_read_only,
409         Opt_read_write,
410         /* Boolean args above */
411         Opt_last_bool,
412 };
413
414 static match_table_t rbd_opts_tokens = {
415         /* int args above */
416         /* string args above */
417         {Opt_read_only, "read_only"},
418         {Opt_read_only, "ro"},          /* Alternate spelling */
419         {Opt_read_write, "read_write"},
420         {Opt_read_write, "rw"},         /* Alternate spelling */
421         /* Boolean args above */
422         {-1, NULL}
423 };
424
425 static int parse_rbd_opts_token(char *c, void *private)
426 {
427         struct rbd_options *rbd_opts = private;
428         substring_t argstr[MAX_OPT_ARGS];
429         int token, intval, ret;
430
431         token = match_token(c, rbd_opts_tokens, argstr);
432         if (token < 0)
433                 return -EINVAL;
434
435         if (token < Opt_last_int) {
436                 ret = match_int(&argstr[0], &intval);
437                 if (ret < 0) {
438                         pr_err("bad mount option arg (not int) "
439                                "at '%s'\n", c);
440                         return ret;
441                 }
442                 dout("got int token %d val %d\n", token, intval);
443         } else if (token > Opt_last_int && token < Opt_last_string) {
444                 dout("got string token %d val %s\n", token,
445                      argstr[0].from);
446         } else if (token > Opt_last_string && token < Opt_last_bool) {
447                 dout("got Boolean token %d\n", token);
448         } else {
449                 dout("got token %d\n", token);
450         }
451
452         switch (token) {
453         case Opt_read_only:
454                 rbd_opts->read_only = true;
455                 break;
456         case Opt_read_write:
457                 rbd_opts->read_only = false;
458                 break;
459         default:
460                 rbd_assert(false);
461                 break;
462         }
463         return 0;
464 }
465
466 /*
467  * Get a ceph client with specific addr and configuration, if one does
468  * not exist create it.
469  */
470 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
471 {
472         struct rbd_client *rbdc;
473
474         rbdc = rbd_client_find(ceph_opts);
475         if (rbdc)       /* using an existing client */
476                 ceph_destroy_options(ceph_opts);
477         else
478                 rbdc = rbd_client_create(ceph_opts);
479
480         return rbdc;
481 }
482
483 /*
484  * Destroy ceph client
485  *
486  * Caller must hold rbd_client_list_lock.
487  */
488 static void rbd_client_release(struct kref *kref)
489 {
490         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
491
492         dout("rbd_release_client %p\n", rbdc);
493         spin_lock(&rbd_client_list_lock);
494         list_del(&rbdc->node);
495         spin_unlock(&rbd_client_list_lock);
496
497         ceph_destroy_client(rbdc->client);
498         kfree(rbdc);
499 }
500
501 /*
502  * Drop reference to ceph client node. If it's not referenced anymore, release
503  * it.
504  */
505 static void rbd_put_client(struct rbd_client *rbdc)
506 {
507         kref_put(&rbdc->kref, rbd_client_release);
508 }
509
510 /*
511  * Destroy requests collection
512  */
513 static void rbd_coll_release(struct kref *kref)
514 {
515         struct rbd_req_coll *coll =
516                 container_of(kref, struct rbd_req_coll, kref);
517
518         dout("rbd_coll_release %p\n", coll);
519         kfree(coll);
520 }
521
522 static bool rbd_image_format_valid(u32 image_format)
523 {
524         return image_format == 1 || image_format == 2;
525 }
526
527 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
528 {
529         size_t size;
530         u32 snap_count;
531
532         /* The header has to start with the magic rbd header text */
533         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
534                 return false;
535
536         /* The bio layer requires at least sector-sized I/O */
537
538         if (ondisk->options.order < SECTOR_SHIFT)
539                 return false;
540
541         /* If we use u64 in a few spots we may be able to loosen this */
542
543         if (ondisk->options.order > 8 * sizeof (int) - 1)
544                 return false;
545
546         /*
547          * The size of a snapshot header has to fit in a size_t, and
548          * that limits the number of snapshots.
549          */
550         snap_count = le32_to_cpu(ondisk->snap_count);
551         size = SIZE_MAX - sizeof (struct ceph_snap_context);
552         if (snap_count > size / sizeof (__le64))
553                 return false;
554
555         /*
556          * Not only that, but the size of the entire the snapshot
557          * header must also be representable in a size_t.
558          */
559         size -= snap_count * sizeof (__le64);
560         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
561                 return false;
562
563         return true;
564 }
565
566 /*
567  * Create a new header structure, translate header format from the on-disk
568  * header.
569  */
570 static int rbd_header_from_disk(struct rbd_image_header *header,
571                                  struct rbd_image_header_ondisk *ondisk)
572 {
573         u32 snap_count;
574         size_t len;
575         size_t size;
576         u32 i;
577
578         memset(header, 0, sizeof (*header));
579
580         snap_count = le32_to_cpu(ondisk->snap_count);
581
582         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
583         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
584         if (!header->object_prefix)
585                 return -ENOMEM;
586         memcpy(header->object_prefix, ondisk->object_prefix, len);
587         header->object_prefix[len] = '\0';
588
589         if (snap_count) {
590                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
591
592                 /* Save a copy of the snapshot names */
593
594                 if (snap_names_len > (u64) SIZE_MAX)
595                         return -EIO;
596                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
597                 if (!header->snap_names)
598                         goto out_err;
599                 /*
600                  * Note that rbd_dev_v1_header_read() guarantees
601                  * the ondisk buffer we're working with has
602                  * snap_names_len bytes beyond the end of the
603                  * snapshot id array, this memcpy() is safe.
604                  */
605                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
606                         snap_names_len);
607
608                 /* Record each snapshot's size */
609
610                 size = snap_count * sizeof (*header->snap_sizes);
611                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
612                 if (!header->snap_sizes)
613                         goto out_err;
614                 for (i = 0; i < snap_count; i++)
615                         header->snap_sizes[i] =
616                                 le64_to_cpu(ondisk->snaps[i].image_size);
617         } else {
618                 WARN_ON(ondisk->snap_names_len);
619                 header->snap_names = NULL;
620                 header->snap_sizes = NULL;
621         }
622
623         header->features = 0;   /* No features support in v1 images */
624         header->obj_order = ondisk->options.order;
625         header->crypt_type = ondisk->options.crypt_type;
626         header->comp_type = ondisk->options.comp_type;
627
628         /* Allocate and fill in the snapshot context */
629
630         header->image_size = le64_to_cpu(ondisk->image_size);
631         size = sizeof (struct ceph_snap_context);
632         size += snap_count * sizeof (header->snapc->snaps[0]);
633         header->snapc = kzalloc(size, GFP_KERNEL);
634         if (!header->snapc)
635                 goto out_err;
636
637         atomic_set(&header->snapc->nref, 1);
638         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
639         header->snapc->num_snaps = snap_count;
640         for (i = 0; i < snap_count; i++)
641                 header->snapc->snaps[i] =
642                         le64_to_cpu(ondisk->snaps[i].id);
643
644         return 0;
645
646 out_err:
647         kfree(header->snap_sizes);
648         header->snap_sizes = NULL;
649         kfree(header->snap_names);
650         header->snap_names = NULL;
651         kfree(header->object_prefix);
652         header->object_prefix = NULL;
653
654         return -ENOMEM;
655 }
656
657 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
658 {
659
660         struct rbd_snap *snap;
661
662         list_for_each_entry(snap, &rbd_dev->snaps, node) {
663                 if (!strcmp(snap_name, snap->name)) {
664                         rbd_dev->spec->snap_id = snap->id;
665                         rbd_dev->mapping.size = snap->size;
666                         rbd_dev->mapping.features = snap->features;
667
668                         return 0;
669                 }
670         }
671
672         return -ENOENT;
673 }
674
675 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
676 {
677         int ret;
678
679         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
680                     sizeof (RBD_SNAP_HEAD_NAME))) {
681                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
682                 rbd_dev->mapping.size = rbd_dev->header.image_size;
683                 rbd_dev->mapping.features = rbd_dev->header.features;
684                 ret = 0;
685         } else {
686                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
687                 if (ret < 0)
688                         goto done;
689                 rbd_dev->mapping.read_only = true;
690         }
691         rbd_dev->exists = true;
692 done:
693         return ret;
694 }
695
696 static void rbd_header_free(struct rbd_image_header *header)
697 {
698         kfree(header->object_prefix);
699         header->object_prefix = NULL;
700         kfree(header->snap_sizes);
701         header->snap_sizes = NULL;
702         kfree(header->snap_names);
703         header->snap_names = NULL;
704         ceph_put_snap_context(header->snapc);
705         header->snapc = NULL;
706 }
707
708 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
709 {
710         char *name;
711         u64 segment;
712         int ret;
713
714         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
715         if (!name)
716                 return NULL;
717         segment = offset >> rbd_dev->header.obj_order;
718         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
719                         rbd_dev->header.object_prefix, segment);
720         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
721                 pr_err("error formatting segment name for #%llu (%d)\n",
722                         segment, ret);
723                 kfree(name);
724                 name = NULL;
725         }
726
727         return name;
728 }
729
730 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
731 {
732         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
733
734         return offset & (segment_size - 1);
735 }
736
737 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
738                                 u64 offset, u64 length)
739 {
740         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
741
742         offset &= segment_size - 1;
743
744         rbd_assert(length <= U64_MAX - offset);
745         if (offset + length > segment_size)
746                 length = segment_size - offset;
747
748         return length;
749 }
750
751 static int rbd_get_num_segments(struct rbd_image_header *header,
752                                 u64 ofs, u64 len)
753 {
754         u64 start_seg;
755         u64 end_seg;
756
757         if (!len)
758                 return 0;
759         if (len - 1 > U64_MAX - ofs)
760                 return -ERANGE;
761
762         start_seg = ofs >> header->obj_order;
763         end_seg = (ofs + len - 1) >> header->obj_order;
764
765         return end_seg - start_seg + 1;
766 }
767
768 /*
769  * returns the size of an object in the image
770  */
771 static u64 rbd_obj_bytes(struct rbd_image_header *header)
772 {
773         return 1 << header->obj_order;
774 }
775
776 /*
777  * bio helpers
778  */
779
780 static void bio_chain_put(struct bio *chain)
781 {
782         struct bio *tmp;
783
784         while (chain) {
785                 tmp = chain;
786                 chain = chain->bi_next;
787                 bio_put(tmp);
788         }
789 }
790
791 /*
792  * zeros a bio chain, starting at specific offset
793  */
794 static void zero_bio_chain(struct bio *chain, int start_ofs)
795 {
796         struct bio_vec *bv;
797         unsigned long flags;
798         void *buf;
799         int i;
800         int pos = 0;
801
802         while (chain) {
803                 bio_for_each_segment(bv, chain, i) {
804                         if (pos + bv->bv_len > start_ofs) {
805                                 int remainder = max(start_ofs - pos, 0);
806                                 buf = bvec_kmap_irq(bv, &flags);
807                                 memset(buf + remainder, 0,
808                                        bv->bv_len - remainder);
809                                 bvec_kunmap_irq(buf, &flags);
810                         }
811                         pos += bv->bv_len;
812                 }
813
814                 chain = chain->bi_next;
815         }
816 }
817
818 /*
819  * Clone a portion of a bio, starting at the given byte offset
820  * and continuing for the number of bytes indicated.
821  */
822 static struct bio *bio_clone_range(struct bio *bio_src,
823                                         unsigned int offset,
824                                         unsigned int len,
825                                         gfp_t gfpmask)
826 {
827         struct bio_vec *bv;
828         unsigned int resid;
829         unsigned short idx;
830         unsigned int voff;
831         unsigned short end_idx;
832         unsigned short vcnt;
833         struct bio *bio;
834
835         /* Handle the easy case for the caller */
836
837         if (!offset && len == bio_src->bi_size)
838                 return bio_clone(bio_src, gfpmask);
839
840         if (WARN_ON_ONCE(!len))
841                 return NULL;
842         if (WARN_ON_ONCE(len > bio_src->bi_size))
843                 return NULL;
844         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
845                 return NULL;
846
847         /* Find first affected segment... */
848
849         resid = offset;
850         __bio_for_each_segment(bv, bio_src, idx, 0) {
851                 if (resid < bv->bv_len)
852                         break;
853                 resid -= bv->bv_len;
854         }
855         voff = resid;
856
857         /* ...and the last affected segment */
858
859         resid += len;
860         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
861                 if (resid <= bv->bv_len)
862                         break;
863                 resid -= bv->bv_len;
864         }
865         vcnt = end_idx - idx + 1;
866
867         /* Build the clone */
868
869         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
870         if (!bio)
871                 return NULL;    /* ENOMEM */
872
873         bio->bi_bdev = bio_src->bi_bdev;
874         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
875         bio->bi_rw = bio_src->bi_rw;
876         bio->bi_flags |= 1 << BIO_CLONED;
877
878         /*
879          * Copy over our part of the bio_vec, then update the first
880          * and last (or only) entries.
881          */
882         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
883                         vcnt * sizeof (struct bio_vec));
884         bio->bi_io_vec[0].bv_offset += voff;
885         if (vcnt > 1) {
886                 bio->bi_io_vec[0].bv_len -= voff;
887                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
888         } else {
889                 bio->bi_io_vec[0].bv_len = len;
890         }
891
892         bio->bi_vcnt = vcnt;
893         bio->bi_size = len;
894         bio->bi_idx = 0;
895
896         return bio;
897 }
898
899 /*
900  * Clone a portion of a bio chain, starting at the given byte offset
901  * into the first bio in the source chain and continuing for the
902  * number of bytes indicated.  The result is another bio chain of
903  * exactly the given length, or a null pointer on error.
904  *
905  * The bio_src and offset parameters are both in-out.  On entry they
906  * refer to the first source bio and the offset into that bio where
907  * the start of data to be cloned is located.
908  *
909  * On return, bio_src is updated to refer to the bio in the source
910  * chain that contains first un-cloned byte, and *offset will
911  * contain the offset of that byte within that bio.
912  */
913 static struct bio *bio_chain_clone_range(struct bio **bio_src,
914                                         unsigned int *offset,
915                                         unsigned int len,
916                                         gfp_t gfpmask)
917 {
918         struct bio *bi = *bio_src;
919         unsigned int off = *offset;
920         struct bio *chain = NULL;
921         struct bio **end;
922
923         /* Build up a chain of clone bios up to the limit */
924
925         if (!bi || off >= bi->bi_size || !len)
926                 return NULL;            /* Nothing to clone */
927
928         end = &chain;
929         while (len) {
930                 unsigned int bi_size;
931                 struct bio *bio;
932
933                 if (!bi)
934                         goto out_err;   /* EINVAL; ran out of bio's */
935                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
936                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
937                 if (!bio)
938                         goto out_err;   /* ENOMEM */
939
940                 *end = bio;
941                 end = &bio->bi_next;
942
943                 off += bi_size;
944                 if (off == bi->bi_size) {
945                         bi = bi->bi_next;
946                         off = 0;
947                 }
948                 len -= bi_size;
949         }
950         *bio_src = bi;
951         *offset = off;
952
953         return chain;
954 out_err:
955         bio_chain_put(chain);
956
957         return NULL;
958 }
959
960 /*
961  * helpers for osd request op vectors.
962  */
963 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
964                                         int opcode, u32 payload_len)
965 {
966         struct ceph_osd_req_op *ops;
967
968         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
969         if (!ops)
970                 return NULL;
971
972         ops[0].op = opcode;
973
974         /*
975          * op extent offset and length will be set later on
976          * in calc_raw_layout()
977          */
978         ops[0].payload_len = payload_len;
979
980         return ops;
981 }
982
983 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
984 {
985         kfree(ops);
986 }
987
988 static void rbd_coll_end_req_index(struct request *rq,
989                                    struct rbd_req_coll *coll,
990                                    int index,
991                                    int ret, u64 len)
992 {
993         struct request_queue *q;
994         int min, max, i;
995
996         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
997              coll, index, ret, (unsigned long long) len);
998
999         if (!rq)
1000                 return;
1001
1002         if (!coll) {
1003                 blk_end_request(rq, ret, len);
1004                 return;
1005         }
1006
1007         q = rq->q;
1008
1009         spin_lock_irq(q->queue_lock);
1010         coll->status[index].done = 1;
1011         coll->status[index].rc = ret;
1012         coll->status[index].bytes = len;
1013         max = min = coll->num_done;
1014         while (max < coll->total && coll->status[max].done)
1015                 max++;
1016
1017         for (i = min; i<max; i++) {
1018                 __blk_end_request(rq, coll->status[i].rc,
1019                                   coll->status[i].bytes);
1020                 coll->num_done++;
1021                 kref_put(&coll->kref, rbd_coll_release);
1022         }
1023         spin_unlock_irq(q->queue_lock);
1024 }
1025
1026 static void rbd_coll_end_req(struct rbd_request *req,
1027                              int ret, u64 len)
1028 {
1029         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1030 }
1031
1032 /*
1033  * Send ceph osd request
1034  */
1035 static int rbd_do_request(struct request *rq,
1036                           struct rbd_device *rbd_dev,
1037                           struct ceph_snap_context *snapc,
1038                           u64 snapid,
1039                           const char *object_name, u64 ofs, u64 len,
1040                           struct bio *bio,
1041                           struct page **pages,
1042                           int num_pages,
1043                           int flags,
1044                           struct ceph_osd_req_op *ops,
1045                           struct rbd_req_coll *coll,
1046                           int coll_index,
1047                           void (*rbd_cb)(struct ceph_osd_request *req,
1048                                          struct ceph_msg *msg),
1049                           struct ceph_osd_request **linger_req,
1050                           u64 *ver)
1051 {
1052         struct ceph_osd_request *req;
1053         struct ceph_file_layout *layout;
1054         int ret;
1055         u64 bno;
1056         struct timespec mtime = CURRENT_TIME;
1057         struct rbd_request *req_data;
1058         struct ceph_osd_request_head *reqhead;
1059         struct ceph_osd_client *osdc;
1060
1061         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1062         if (!req_data) {
1063                 if (coll)
1064                         rbd_coll_end_req_index(rq, coll, coll_index,
1065                                                -ENOMEM, len);
1066                 return -ENOMEM;
1067         }
1068
1069         if (coll) {
1070                 req_data->coll = coll;
1071                 req_data->coll_index = coll_index;
1072         }
1073
1074         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1075                 object_name, (unsigned long long) ofs,
1076                 (unsigned long long) len, coll, coll_index);
1077
1078         osdc = &rbd_dev->rbd_client->client->osdc;
1079         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1080                                         false, GFP_NOIO, pages, bio);
1081         if (!req) {
1082                 ret = -ENOMEM;
1083                 goto done_pages;
1084         }
1085
1086         req->r_callback = rbd_cb;
1087
1088         req_data->rq = rq;
1089         req_data->bio = bio;
1090         req_data->pages = pages;
1091         req_data->len = len;
1092
1093         req->r_priv = req_data;
1094
1095         reqhead = req->r_request->front.iov_base;
1096         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1097
1098         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1099         req->r_oid_len = strlen(req->r_oid);
1100
1101         layout = &req->r_file_layout;
1102         memset(layout, 0, sizeof(*layout));
1103         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1104         layout->fl_stripe_count = cpu_to_le32(1);
1105         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1106         layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1107         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1108                                    req, ops);
1109         rbd_assert(ret == 0);
1110
1111         ceph_osdc_build_request(req, ofs, &len,
1112                                 ops,
1113                                 snapc,
1114                                 &mtime,
1115                                 req->r_oid, req->r_oid_len);
1116
1117         if (linger_req) {
1118                 ceph_osdc_set_request_linger(osdc, req);
1119                 *linger_req = req;
1120         }
1121
1122         ret = ceph_osdc_start_request(osdc, req, false);
1123         if (ret < 0)
1124                 goto done_err;
1125
1126         if (!rbd_cb) {
1127                 ret = ceph_osdc_wait_request(osdc, req);
1128                 if (ver)
1129                         *ver = le64_to_cpu(req->r_reassert_version.version);
1130                 dout("reassert_ver=%llu\n",
1131                         (unsigned long long)
1132                                 le64_to_cpu(req->r_reassert_version.version));
1133                 ceph_osdc_put_request(req);
1134         }
1135         return ret;
1136
1137 done_err:
1138         bio_chain_put(req_data->bio);
1139         ceph_osdc_put_request(req);
1140 done_pages:
1141         rbd_coll_end_req(req_data, ret, len);
1142         kfree(req_data);
1143         return ret;
1144 }
1145
1146 /*
1147  * Ceph osd op callback
1148  */
1149 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1150 {
1151         struct rbd_request *req_data = req->r_priv;
1152         struct ceph_osd_reply_head *replyhead;
1153         struct ceph_osd_op *op;
1154         __s32 rc;
1155         u64 bytes;
1156         int read_op;
1157
1158         /* parse reply */
1159         replyhead = msg->front.iov_base;
1160         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1161         op = (void *)(replyhead + 1);
1162         rc = le32_to_cpu(replyhead->result);
1163         bytes = le64_to_cpu(op->extent.length);
1164         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1165
1166         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1167                 (unsigned long long) bytes, read_op, (int) rc);
1168
1169         if (rc == -ENOENT && read_op) {
1170                 zero_bio_chain(req_data->bio, 0);
1171                 rc = 0;
1172         } else if (rc == 0 && read_op && bytes < req_data->len) {
1173                 zero_bio_chain(req_data->bio, bytes);
1174                 bytes = req_data->len;
1175         }
1176
1177         rbd_coll_end_req(req_data, rc, bytes);
1178
1179         if (req_data->bio)
1180                 bio_chain_put(req_data->bio);
1181
1182         ceph_osdc_put_request(req);
1183         kfree(req_data);
1184 }
1185
1186 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1187 {
1188         ceph_osdc_put_request(req);
1189 }
1190
1191 /*
1192  * Do a synchronous ceph osd operation
1193  */
1194 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1195                            struct ceph_snap_context *snapc,
1196                            u64 snapid,
1197                            int flags,
1198                            struct ceph_osd_req_op *ops,
1199                            const char *object_name,
1200                            u64 ofs, u64 inbound_size,
1201                            char *inbound,
1202                            struct ceph_osd_request **linger_req,
1203                            u64 *ver)
1204 {
1205         int ret;
1206         struct page **pages;
1207         int num_pages;
1208
1209         rbd_assert(ops != NULL);
1210
1211         num_pages = calc_pages_for(ofs, inbound_size);
1212         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1213         if (IS_ERR(pages))
1214                 return PTR_ERR(pages);
1215
1216         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1217                           object_name, ofs, inbound_size, NULL,
1218                           pages, num_pages,
1219                           flags,
1220                           ops,
1221                           NULL, 0,
1222                           NULL,
1223                           linger_req, ver);
1224         if (ret < 0)
1225                 goto done;
1226
1227         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1228                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1229
1230 done:
1231         ceph_release_page_vector(pages, num_pages);
1232         return ret;
1233 }
1234
1235 /*
1236  * Do an asynchronous ceph osd operation
1237  */
1238 static int rbd_do_op(struct request *rq,
1239                      struct rbd_device *rbd_dev,
1240                      struct ceph_snap_context *snapc,
1241                      u64 ofs, u64 len,
1242                      struct bio *bio,
1243                      struct rbd_req_coll *coll,
1244                      int coll_index)
1245 {
1246         char *seg_name;
1247         u64 seg_ofs;
1248         u64 seg_len;
1249         int ret;
1250         struct ceph_osd_req_op *ops;
1251         u32 payload_len;
1252         int opcode;
1253         int flags;
1254         u64 snapid;
1255
1256         seg_name = rbd_segment_name(rbd_dev, ofs);
1257         if (!seg_name)
1258                 return -ENOMEM;
1259         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1260         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1261
1262         if (rq_data_dir(rq) == WRITE) {
1263                 opcode = CEPH_OSD_OP_WRITE;
1264                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1265                 snapid = CEPH_NOSNAP;
1266                 payload_len = seg_len;
1267         } else {
1268                 opcode = CEPH_OSD_OP_READ;
1269                 flags = CEPH_OSD_FLAG_READ;
1270                 snapc = NULL;
1271                 snapid = rbd_dev->spec->snap_id;
1272                 payload_len = 0;
1273         }
1274
1275         ret = -ENOMEM;
1276         ops = rbd_create_rw_ops(1, opcode, payload_len);
1277         if (!ops)
1278                 goto done;
1279
1280         /* we've taken care of segment sizes earlier when we
1281            cloned the bios. We should never have a segment
1282            truncated at this point */
1283         rbd_assert(seg_len == len);
1284
1285         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1286                              seg_name, seg_ofs, seg_len,
1287                              bio,
1288                              NULL, 0,
1289                              flags,
1290                              ops,
1291                              coll, coll_index,
1292                              rbd_req_cb, 0, NULL);
1293
1294         rbd_destroy_ops(ops);
1295 done:
1296         kfree(seg_name);
1297         return ret;
1298 }
1299
1300 /*
1301  * Request sync osd read
1302  */
1303 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1304                           u64 snapid,
1305                           const char *object_name,
1306                           u64 ofs, u64 len,
1307                           char *buf,
1308                           u64 *ver)
1309 {
1310         struct ceph_osd_req_op *ops;
1311         int ret;
1312
1313         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1314         if (!ops)
1315                 return -ENOMEM;
1316
1317         ret = rbd_req_sync_op(rbd_dev, NULL,
1318                                snapid,
1319                                CEPH_OSD_FLAG_READ,
1320                                ops, object_name, ofs, len, buf, NULL, ver);
1321         rbd_destroy_ops(ops);
1322
1323         return ret;
1324 }
1325
1326 /*
1327  * Request sync osd watch
1328  */
1329 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1330                                    u64 ver,
1331                                    u64 notify_id)
1332 {
1333         struct ceph_osd_req_op *ops;
1334         int ret;
1335
1336         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1337         if (!ops)
1338                 return -ENOMEM;
1339
1340         ops[0].watch.ver = cpu_to_le64(ver);
1341         ops[0].watch.cookie = notify_id;
1342         ops[0].watch.flag = 0;
1343
1344         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1345                           rbd_dev->header_name, 0, 0, NULL,
1346                           NULL, 0,
1347                           CEPH_OSD_FLAG_READ,
1348                           ops,
1349                           NULL, 0,
1350                           rbd_simple_req_cb, 0, NULL);
1351
1352         rbd_destroy_ops(ops);
1353         return ret;
1354 }
1355
1356 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1357 {
1358         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1359         u64 hver;
1360         int rc;
1361
1362         if (!rbd_dev)
1363                 return;
1364
1365         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1366                 rbd_dev->header_name, (unsigned long long) notify_id,
1367                 (unsigned int) opcode);
1368         rc = rbd_dev_refresh(rbd_dev, &hver);
1369         if (rc)
1370                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1371                            " update snaps: %d\n", rbd_dev->major, rc);
1372
1373         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1374 }
1375
1376 /*
1377  * Request sync osd watch
1378  */
1379 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1380 {
1381         struct ceph_osd_req_op *ops;
1382         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1383         int ret;
1384
1385         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1386         if (!ops)
1387                 return -ENOMEM;
1388
1389         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1390                                      (void *)rbd_dev, &rbd_dev->watch_event);
1391         if (ret < 0)
1392                 goto fail;
1393
1394         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1395         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1396         ops[0].watch.flag = 1;
1397
1398         ret = rbd_req_sync_op(rbd_dev, NULL,
1399                               CEPH_NOSNAP,
1400                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1401                               ops,
1402                               rbd_dev->header_name,
1403                               0, 0, NULL,
1404                               &rbd_dev->watch_request, NULL);
1405
1406         if (ret < 0)
1407                 goto fail_event;
1408
1409         rbd_destroy_ops(ops);
1410         return 0;
1411
1412 fail_event:
1413         ceph_osdc_cancel_event(rbd_dev->watch_event);
1414         rbd_dev->watch_event = NULL;
1415 fail:
1416         rbd_destroy_ops(ops);
1417         return ret;
1418 }
1419
1420 /*
1421  * Request sync osd unwatch
1422  */
1423 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1424 {
1425         struct ceph_osd_req_op *ops;
1426         int ret;
1427
1428         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1429         if (!ops)
1430                 return -ENOMEM;
1431
1432         ops[0].watch.ver = 0;
1433         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1434         ops[0].watch.flag = 0;
1435
1436         ret = rbd_req_sync_op(rbd_dev, NULL,
1437                               CEPH_NOSNAP,
1438                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1439                               ops,
1440                               rbd_dev->header_name,
1441                               0, 0, NULL, NULL, NULL);
1442
1443
1444         rbd_destroy_ops(ops);
1445         ceph_osdc_cancel_event(rbd_dev->watch_event);
1446         rbd_dev->watch_event = NULL;
1447         return ret;
1448 }
1449
1450 /*
1451  * Synchronous osd object method call
1452  */
1453 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1454                              const char *object_name,
1455                              const char *class_name,
1456                              const char *method_name,
1457                              const char *outbound,
1458                              size_t outbound_size,
1459                              char *inbound,
1460                              size_t inbound_size,
1461                              int flags,
1462                              u64 *ver)
1463 {
1464         struct ceph_osd_req_op *ops;
1465         int class_name_len = strlen(class_name);
1466         int method_name_len = strlen(method_name);
1467         int payload_size;
1468         int ret;
1469
1470         /*
1471          * Any input parameters required by the method we're calling
1472          * will be sent along with the class and method names as
1473          * part of the message payload.  That data and its size are
1474          * supplied via the indata and indata_len fields (named from
1475          * the perspective of the server side) in the OSD request
1476          * operation.
1477          */
1478         payload_size = class_name_len + method_name_len + outbound_size;
1479         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1480         if (!ops)
1481                 return -ENOMEM;
1482
1483         ops[0].cls.class_name = class_name;
1484         ops[0].cls.class_len = (__u8) class_name_len;
1485         ops[0].cls.method_name = method_name;
1486         ops[0].cls.method_len = (__u8) method_name_len;
1487         ops[0].cls.argc = 0;
1488         ops[0].cls.indata = outbound;
1489         ops[0].cls.indata_len = outbound_size;
1490
1491         ret = rbd_req_sync_op(rbd_dev, NULL,
1492                                CEPH_NOSNAP,
1493                                flags, ops,
1494                                object_name, 0, inbound_size, inbound,
1495                                NULL, ver);
1496
1497         rbd_destroy_ops(ops);
1498
1499         dout("cls_exec returned %d\n", ret);
1500         return ret;
1501 }
1502
1503 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1504 {
1505         struct rbd_req_coll *coll =
1506                         kzalloc(sizeof(struct rbd_req_coll) +
1507                                 sizeof(struct rbd_req_status) * num_reqs,
1508                                 GFP_ATOMIC);
1509
1510         if (!coll)
1511                 return NULL;
1512         coll->total = num_reqs;
1513         kref_init(&coll->kref);
1514         return coll;
1515 }
1516
1517 /*
1518  * block device queue callback
1519  */
1520 static void rbd_rq_fn(struct request_queue *q)
1521 {
1522         struct rbd_device *rbd_dev = q->queuedata;
1523         struct request *rq;
1524
1525         while ((rq = blk_fetch_request(q))) {
1526                 struct bio *bio;
1527                 bool do_write;
1528                 unsigned int size;
1529                 u64 ofs;
1530                 int num_segs, cur_seg = 0;
1531                 struct rbd_req_coll *coll;
1532                 struct ceph_snap_context *snapc;
1533                 unsigned int bio_offset;
1534
1535                 dout("fetched request\n");
1536
1537                 /* filter out block requests we don't understand */
1538                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1539                         __blk_end_request_all(rq, 0);
1540                         continue;
1541                 }
1542
1543                 /* deduce our operation (read, write) */
1544                 do_write = (rq_data_dir(rq) == WRITE);
1545                 if (do_write && rbd_dev->mapping.read_only) {
1546                         __blk_end_request_all(rq, -EROFS);
1547                         continue;
1548                 }
1549
1550                 spin_unlock_irq(q->queue_lock);
1551
1552                 down_read(&rbd_dev->header_rwsem);
1553
1554                 if (!rbd_dev->exists) {
1555                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1556                         up_read(&rbd_dev->header_rwsem);
1557                         dout("request for non-existent snapshot");
1558                         spin_lock_irq(q->queue_lock);
1559                         __blk_end_request_all(rq, -ENXIO);
1560                         continue;
1561                 }
1562
1563                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1564
1565                 up_read(&rbd_dev->header_rwsem);
1566
1567                 size = blk_rq_bytes(rq);
1568                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1569                 bio = rq->bio;
1570
1571                 dout("%s 0x%x bytes at 0x%llx\n",
1572                      do_write ? "write" : "read",
1573                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1574
1575                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1576                 if (num_segs <= 0) {
1577                         spin_lock_irq(q->queue_lock);
1578                         __blk_end_request_all(rq, num_segs);
1579                         ceph_put_snap_context(snapc);
1580                         continue;
1581                 }
1582                 coll = rbd_alloc_coll(num_segs);
1583                 if (!coll) {
1584                         spin_lock_irq(q->queue_lock);
1585                         __blk_end_request_all(rq, -ENOMEM);
1586                         ceph_put_snap_context(snapc);
1587                         continue;
1588                 }
1589
1590                 bio_offset = 0;
1591                 do {
1592                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1593                         unsigned int chain_size;
1594                         struct bio *bio_chain;
1595
1596                         BUG_ON(limit > (u64) UINT_MAX);
1597                         chain_size = (unsigned int) limit;
1598                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1599
1600                         kref_get(&coll->kref);
1601
1602                         /* Pass a cloned bio chain via an osd request */
1603
1604                         bio_chain = bio_chain_clone_range(&bio,
1605                                                 &bio_offset, chain_size,
1606                                                 GFP_ATOMIC);
1607                         if (bio_chain)
1608                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1609                                                 ofs, chain_size,
1610                                                 bio_chain, coll, cur_seg);
1611                         else
1612                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1613                                                        -ENOMEM, chain_size);
1614                         size -= chain_size;
1615                         ofs += chain_size;
1616
1617                         cur_seg++;
1618                 } while (size > 0);
1619                 kref_put(&coll->kref, rbd_coll_release);
1620
1621                 spin_lock_irq(q->queue_lock);
1622
1623                 ceph_put_snap_context(snapc);
1624         }
1625 }
1626
1627 /*
1628  * a queue callback. Makes sure that we don't create a bio that spans across
1629  * multiple osd objects. One exception would be with a single page bios,
1630  * which we handle later at bio_chain_clone_range()
1631  */
1632 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1633                           struct bio_vec *bvec)
1634 {
1635         struct rbd_device *rbd_dev = q->queuedata;
1636         sector_t sector_offset;
1637         sector_t sectors_per_obj;
1638         sector_t obj_sector_offset;
1639         int ret;
1640
1641         /*
1642          * Find how far into its rbd object the partition-relative
1643          * bio start sector is to offset relative to the enclosing
1644          * device.
1645          */
1646         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1647         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1648         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1649
1650         /*
1651          * Compute the number of bytes from that offset to the end
1652          * of the object.  Account for what's already used by the bio.
1653          */
1654         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1655         if (ret > bmd->bi_size)
1656                 ret -= bmd->bi_size;
1657         else
1658                 ret = 0;
1659
1660         /*
1661          * Don't send back more than was asked for.  And if the bio
1662          * was empty, let the whole thing through because:  "Note
1663          * that a block device *must* allow a single page to be
1664          * added to an empty bio."
1665          */
1666         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1667         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1668                 ret = (int) bvec->bv_len;
1669
1670         return ret;
1671 }
1672
1673 static void rbd_free_disk(struct rbd_device *rbd_dev)
1674 {
1675         struct gendisk *disk = rbd_dev->disk;
1676
1677         if (!disk)
1678                 return;
1679
1680         if (disk->flags & GENHD_FL_UP)
1681                 del_gendisk(disk);
1682         if (disk->queue)
1683                 blk_cleanup_queue(disk->queue);
1684         put_disk(disk);
1685 }
1686
1687 /*
1688  * Read the complete header for the given rbd device.
1689  *
1690  * Returns a pointer to a dynamically-allocated buffer containing
1691  * the complete and validated header.  Caller can pass the address
1692  * of a variable that will be filled in with the version of the
1693  * header object at the time it was read.
1694  *
1695  * Returns a pointer-coded errno if a failure occurs.
1696  */
1697 static struct rbd_image_header_ondisk *
1698 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1699 {
1700         struct rbd_image_header_ondisk *ondisk = NULL;
1701         u32 snap_count = 0;
1702         u64 names_size = 0;
1703         u32 want_count;
1704         int ret;
1705
1706         /*
1707          * The complete header will include an array of its 64-bit
1708          * snapshot ids, followed by the names of those snapshots as
1709          * a contiguous block of NUL-terminated strings.  Note that
1710          * the number of snapshots could change by the time we read
1711          * it in, in which case we re-read it.
1712          */
1713         do {
1714                 size_t size;
1715
1716                 kfree(ondisk);
1717
1718                 size = sizeof (*ondisk);
1719                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1720                 size += names_size;
1721                 ondisk = kmalloc(size, GFP_KERNEL);
1722                 if (!ondisk)
1723                         return ERR_PTR(-ENOMEM);
1724
1725                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1726                                        rbd_dev->header_name,
1727                                        0, size,
1728                                        (char *) ondisk, version);
1729
1730                 if (ret < 0)
1731                         goto out_err;
1732                 if (WARN_ON((size_t) ret < size)) {
1733                         ret = -ENXIO;
1734                         pr_warning("short header read for image %s"
1735                                         " (want %zd got %d)\n",
1736                                 rbd_dev->spec->image_name, size, ret);
1737                         goto out_err;
1738                 }
1739                 if (!rbd_dev_ondisk_valid(ondisk)) {
1740                         ret = -ENXIO;
1741                         pr_warning("invalid header for image %s\n",
1742                                 rbd_dev->spec->image_name);
1743                         goto out_err;
1744                 }
1745
1746                 names_size = le64_to_cpu(ondisk->snap_names_len);
1747                 want_count = snap_count;
1748                 snap_count = le32_to_cpu(ondisk->snap_count);
1749         } while (snap_count != want_count);
1750
1751         return ondisk;
1752
1753 out_err:
1754         kfree(ondisk);
1755
1756         return ERR_PTR(ret);
1757 }
1758
1759 /*
1760  * reload the ondisk the header
1761  */
1762 static int rbd_read_header(struct rbd_device *rbd_dev,
1763                            struct rbd_image_header *header)
1764 {
1765         struct rbd_image_header_ondisk *ondisk;
1766         u64 ver = 0;
1767         int ret;
1768
1769         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1770         if (IS_ERR(ondisk))
1771                 return PTR_ERR(ondisk);
1772         ret = rbd_header_from_disk(header, ondisk);
1773         if (ret >= 0)
1774                 header->obj_version = ver;
1775         kfree(ondisk);
1776
1777         return ret;
1778 }
1779
1780 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1781 {
1782         struct rbd_snap *snap;
1783         struct rbd_snap *next;
1784
1785         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1786                 rbd_remove_snap_dev(snap);
1787 }
1788
1789 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1790 {
1791         sector_t size;
1792
1793         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1794                 return;
1795
1796         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1797         dout("setting size to %llu sectors", (unsigned long long) size);
1798         rbd_dev->mapping.size = (u64) size;
1799         set_capacity(rbd_dev->disk, size);
1800 }
1801
1802 /*
1803  * only read the first part of the ondisk header, without the snaps info
1804  */
1805 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1806 {
1807         int ret;
1808         struct rbd_image_header h;
1809
1810         ret = rbd_read_header(rbd_dev, &h);
1811         if (ret < 0)
1812                 return ret;
1813
1814         down_write(&rbd_dev->header_rwsem);
1815
1816         /* Update image size, and check for resize of mapped image */
1817         rbd_dev->header.image_size = h.image_size;
1818         rbd_update_mapping_size(rbd_dev);
1819
1820         /* rbd_dev->header.object_prefix shouldn't change */
1821         kfree(rbd_dev->header.snap_sizes);
1822         kfree(rbd_dev->header.snap_names);
1823         /* osd requests may still refer to snapc */
1824         ceph_put_snap_context(rbd_dev->header.snapc);
1825
1826         if (hver)
1827                 *hver = h.obj_version;
1828         rbd_dev->header.obj_version = h.obj_version;
1829         rbd_dev->header.image_size = h.image_size;
1830         rbd_dev->header.snapc = h.snapc;
1831         rbd_dev->header.snap_names = h.snap_names;
1832         rbd_dev->header.snap_sizes = h.snap_sizes;
1833         /* Free the extra copy of the object prefix */
1834         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1835         kfree(h.object_prefix);
1836
1837         ret = rbd_dev_snaps_update(rbd_dev);
1838         if (!ret)
1839                 ret = rbd_dev_snaps_register(rbd_dev);
1840
1841         up_write(&rbd_dev->header_rwsem);
1842
1843         return ret;
1844 }
1845
1846 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1847 {
1848         int ret;
1849
1850         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1851         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1852         if (rbd_dev->image_format == 1)
1853                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1854         else
1855                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1856         mutex_unlock(&ctl_mutex);
1857
1858         return ret;
1859 }
1860
1861 static int rbd_init_disk(struct rbd_device *rbd_dev)
1862 {
1863         struct gendisk *disk;
1864         struct request_queue *q;
1865         u64 segment_size;
1866
1867         /* create gendisk info */
1868         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1869         if (!disk)
1870                 return -ENOMEM;
1871
1872         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1873                  rbd_dev->dev_id);
1874         disk->major = rbd_dev->major;
1875         disk->first_minor = 0;
1876         disk->fops = &rbd_bd_ops;
1877         disk->private_data = rbd_dev;
1878
1879         /* init rq */
1880         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1881         if (!q)
1882                 goto out_disk;
1883
1884         /* We use the default size, but let's be explicit about it. */
1885         blk_queue_physical_block_size(q, SECTOR_SIZE);
1886
1887         /* set io sizes to object size */
1888         segment_size = rbd_obj_bytes(&rbd_dev->header);
1889         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1890         blk_queue_max_segment_size(q, segment_size);
1891         blk_queue_io_min(q, segment_size);
1892         blk_queue_io_opt(q, segment_size);
1893
1894         blk_queue_merge_bvec(q, rbd_merge_bvec);
1895         disk->queue = q;
1896
1897         q->queuedata = rbd_dev;
1898
1899         rbd_dev->disk = disk;
1900
1901         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1902
1903         return 0;
1904 out_disk:
1905         put_disk(disk);
1906
1907         return -ENOMEM;
1908 }
1909
1910 /*
1911   sysfs
1912 */
1913
1914 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1915 {
1916         return container_of(dev, struct rbd_device, dev);
1917 }
1918
1919 static ssize_t rbd_size_show(struct device *dev,
1920                              struct device_attribute *attr, char *buf)
1921 {
1922         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1923         sector_t size;
1924
1925         down_read(&rbd_dev->header_rwsem);
1926         size = get_capacity(rbd_dev->disk);
1927         up_read(&rbd_dev->header_rwsem);
1928
1929         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1930 }
1931
1932 /*
1933  * Note this shows the features for whatever's mapped, which is not
1934  * necessarily the base image.
1935  */
1936 static ssize_t rbd_features_show(struct device *dev,
1937                              struct device_attribute *attr, char *buf)
1938 {
1939         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1940
1941         return sprintf(buf, "0x%016llx\n",
1942                         (unsigned long long) rbd_dev->mapping.features);
1943 }
1944
1945 static ssize_t rbd_major_show(struct device *dev,
1946                               struct device_attribute *attr, char *buf)
1947 {
1948         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1949
1950         return sprintf(buf, "%d\n", rbd_dev->major);
1951 }
1952
1953 static ssize_t rbd_client_id_show(struct device *dev,
1954                                   struct device_attribute *attr, char *buf)
1955 {
1956         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1957
1958         return sprintf(buf, "client%lld\n",
1959                         ceph_client_id(rbd_dev->rbd_client->client));
1960 }
1961
1962 static ssize_t rbd_pool_show(struct device *dev,
1963                              struct device_attribute *attr, char *buf)
1964 {
1965         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1966
1967         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1968 }
1969
1970 static ssize_t rbd_pool_id_show(struct device *dev,
1971                              struct device_attribute *attr, char *buf)
1972 {
1973         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1974
1975         return sprintf(buf, "%llu\n",
1976                 (unsigned long long) rbd_dev->spec->pool_id);
1977 }
1978
1979 static ssize_t rbd_name_show(struct device *dev,
1980                              struct device_attribute *attr, char *buf)
1981 {
1982         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1983
1984         return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
1985 }
1986
1987 static ssize_t rbd_image_id_show(struct device *dev,
1988                              struct device_attribute *attr, char *buf)
1989 {
1990         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1991
1992         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
1993 }
1994
1995 /*
1996  * Shows the name of the currently-mapped snapshot (or
1997  * RBD_SNAP_HEAD_NAME for the base image).
1998  */
1999 static ssize_t rbd_snap_show(struct device *dev,
2000                              struct device_attribute *attr,
2001                              char *buf)
2002 {
2003         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2004
2005         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2006 }
2007
2008 static ssize_t rbd_image_refresh(struct device *dev,
2009                                  struct device_attribute *attr,
2010                                  const char *buf,
2011                                  size_t size)
2012 {
2013         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2014         int ret;
2015
2016         ret = rbd_dev_refresh(rbd_dev, NULL);
2017
2018         return ret < 0 ? ret : size;
2019 }
2020
2021 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2022 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2023 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2024 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2025 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2026 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2027 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2028 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2029 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2030 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2031
2032 static struct attribute *rbd_attrs[] = {
2033         &dev_attr_size.attr,
2034         &dev_attr_features.attr,
2035         &dev_attr_major.attr,
2036         &dev_attr_client_id.attr,
2037         &dev_attr_pool.attr,
2038         &dev_attr_pool_id.attr,
2039         &dev_attr_name.attr,
2040         &dev_attr_image_id.attr,
2041         &dev_attr_current_snap.attr,
2042         &dev_attr_refresh.attr,
2043         NULL
2044 };
2045
2046 static struct attribute_group rbd_attr_group = {
2047         .attrs = rbd_attrs,
2048 };
2049
2050 static const struct attribute_group *rbd_attr_groups[] = {
2051         &rbd_attr_group,
2052         NULL
2053 };
2054
2055 static void rbd_sysfs_dev_release(struct device *dev)
2056 {
2057 }
2058
2059 static struct device_type rbd_device_type = {
2060         .name           = "rbd",
2061         .groups         = rbd_attr_groups,
2062         .release        = rbd_sysfs_dev_release,
2063 };
2064
2065
2066 /*
2067   sysfs - snapshots
2068 */
2069
2070 static ssize_t rbd_snap_size_show(struct device *dev,
2071                                   struct device_attribute *attr,
2072                                   char *buf)
2073 {
2074         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2075
2076         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2077 }
2078
2079 static ssize_t rbd_snap_id_show(struct device *dev,
2080                                 struct device_attribute *attr,
2081                                 char *buf)
2082 {
2083         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2084
2085         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2086 }
2087
2088 static ssize_t rbd_snap_features_show(struct device *dev,
2089                                 struct device_attribute *attr,
2090                                 char *buf)
2091 {
2092         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2093
2094         return sprintf(buf, "0x%016llx\n",
2095                         (unsigned long long) snap->features);
2096 }
2097
2098 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2099 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2100 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2101
2102 static struct attribute *rbd_snap_attrs[] = {
2103         &dev_attr_snap_size.attr,
2104         &dev_attr_snap_id.attr,
2105         &dev_attr_snap_features.attr,
2106         NULL,
2107 };
2108
2109 static struct attribute_group rbd_snap_attr_group = {
2110         .attrs = rbd_snap_attrs,
2111 };
2112
2113 static void rbd_snap_dev_release(struct device *dev)
2114 {
2115         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2116         kfree(snap->name);
2117         kfree(snap);
2118 }
2119
2120 static const struct attribute_group *rbd_snap_attr_groups[] = {
2121         &rbd_snap_attr_group,
2122         NULL
2123 };
2124
2125 static struct device_type rbd_snap_device_type = {
2126         .groups         = rbd_snap_attr_groups,
2127         .release        = rbd_snap_dev_release,
2128 };
2129
2130 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2131 {
2132         kref_get(&spec->kref);
2133
2134         return spec;
2135 }
2136
2137 static void rbd_spec_free(struct kref *kref);
2138 static void rbd_spec_put(struct rbd_spec *spec)
2139 {
2140         if (spec)
2141                 kref_put(&spec->kref, rbd_spec_free);
2142 }
2143
2144 static struct rbd_spec *rbd_spec_alloc(void)
2145 {
2146         struct rbd_spec *spec;
2147
2148         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2149         if (!spec)
2150                 return NULL;
2151         kref_init(&spec->kref);
2152
2153         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2154
2155         return spec;
2156 }
2157
2158 static void rbd_spec_free(struct kref *kref)
2159 {
2160         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2161
2162         kfree(spec->pool_name);
2163         kfree(spec->image_id);
2164         kfree(spec->image_name);
2165         kfree(spec->snap_name);
2166         kfree(spec);
2167 }
2168
2169 static bool rbd_snap_registered(struct rbd_snap *snap)
2170 {
2171         bool ret = snap->dev.type == &rbd_snap_device_type;
2172         bool reg = device_is_registered(&snap->dev);
2173
2174         rbd_assert(!ret ^ reg);
2175
2176         return ret;
2177 }
2178
2179 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2180 {
2181         list_del(&snap->node);
2182         if (device_is_registered(&snap->dev))
2183                 device_unregister(&snap->dev);
2184 }
2185
2186 static int rbd_register_snap_dev(struct rbd_snap *snap,
2187                                   struct device *parent)
2188 {
2189         struct device *dev = &snap->dev;
2190         int ret;
2191
2192         dev->type = &rbd_snap_device_type;
2193         dev->parent = parent;
2194         dev->release = rbd_snap_dev_release;
2195         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2196         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2197
2198         ret = device_register(dev);
2199
2200         return ret;
2201 }
2202
2203 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2204                                                 const char *snap_name,
2205                                                 u64 snap_id, u64 snap_size,
2206                                                 u64 snap_features)
2207 {
2208         struct rbd_snap *snap;
2209         int ret;
2210
2211         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2212         if (!snap)
2213                 return ERR_PTR(-ENOMEM);
2214
2215         ret = -ENOMEM;
2216         snap->name = kstrdup(snap_name, GFP_KERNEL);
2217         if (!snap->name)
2218                 goto err;
2219
2220         snap->id = snap_id;
2221         snap->size = snap_size;
2222         snap->features = snap_features;
2223
2224         return snap;
2225
2226 err:
2227         kfree(snap->name);
2228         kfree(snap);
2229
2230         return ERR_PTR(ret);
2231 }
2232
2233 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2234                 u64 *snap_size, u64 *snap_features)
2235 {
2236         char *snap_name;
2237
2238         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2239
2240         *snap_size = rbd_dev->header.snap_sizes[which];
2241         *snap_features = 0;     /* No features for v1 */
2242
2243         /* Skip over names until we find the one we are looking for */
2244
2245         snap_name = rbd_dev->header.snap_names;
2246         while (which--)
2247                 snap_name += strlen(snap_name) + 1;
2248
2249         return snap_name;
2250 }
2251
2252 /*
2253  * Get the size and object order for an image snapshot, or if
2254  * snap_id is CEPH_NOSNAP, gets this information for the base
2255  * image.
2256  */
2257 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2258                                 u8 *order, u64 *snap_size)
2259 {
2260         __le64 snapid = cpu_to_le64(snap_id);
2261         int ret;
2262         struct {
2263                 u8 order;
2264                 __le64 size;
2265         } __attribute__ ((packed)) size_buf = { 0 };
2266
2267         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2268                                 "rbd", "get_size",
2269                                 (char *) &snapid, sizeof (snapid),
2270                                 (char *) &size_buf, sizeof (size_buf),
2271                                 CEPH_OSD_FLAG_READ, NULL);
2272         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2273         if (ret < 0)
2274                 return ret;
2275
2276         *order = size_buf.order;
2277         *snap_size = le64_to_cpu(size_buf.size);
2278
2279         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2280                 (unsigned long long) snap_id, (unsigned int) *order,
2281                 (unsigned long long) *snap_size);
2282
2283         return 0;
2284 }
2285
2286 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2287 {
2288         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2289                                         &rbd_dev->header.obj_order,
2290                                         &rbd_dev->header.image_size);
2291 }
2292
2293 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2294 {
2295         void *reply_buf;
2296         int ret;
2297         void *p;
2298
2299         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2300         if (!reply_buf)
2301                 return -ENOMEM;
2302
2303         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2304                                 "rbd", "get_object_prefix",
2305                                 NULL, 0,
2306                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2307                                 CEPH_OSD_FLAG_READ, NULL);
2308         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2309         if (ret < 0)
2310                 goto out;
2311         ret = 0;    /* rbd_req_sync_exec() can return positive */
2312
2313         p = reply_buf;
2314         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2315                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2316                                                 NULL, GFP_NOIO);
2317
2318         if (IS_ERR(rbd_dev->header.object_prefix)) {
2319                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2320                 rbd_dev->header.object_prefix = NULL;
2321         } else {
2322                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2323         }
2324
2325 out:
2326         kfree(reply_buf);
2327
2328         return ret;
2329 }
2330
2331 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2332                 u64 *snap_features)
2333 {
2334         __le64 snapid = cpu_to_le64(snap_id);
2335         struct {
2336                 __le64 features;
2337                 __le64 incompat;
2338         } features_buf = { 0 };
2339         u64 incompat;
2340         int ret;
2341
2342         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2343                                 "rbd", "get_features",
2344                                 (char *) &snapid, sizeof (snapid),
2345                                 (char *) &features_buf, sizeof (features_buf),
2346                                 CEPH_OSD_FLAG_READ, NULL);
2347         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2348         if (ret < 0)
2349                 return ret;
2350
2351         incompat = le64_to_cpu(features_buf.incompat);
2352         if (incompat & ~RBD_FEATURES_ALL)
2353                 return -ENOTSUPP;
2354
2355         *snap_features = le64_to_cpu(features_buf.features);
2356
2357         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2358                 (unsigned long long) snap_id,
2359                 (unsigned long long) *snap_features,
2360                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2361
2362         return 0;
2363 }
2364
2365 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2366 {
2367         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2368                                                 &rbd_dev->header.features);
2369 }
2370
2371 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2372 {
2373         size_t size;
2374         int ret;
2375         void *reply_buf;
2376         void *p;
2377         void *end;
2378         u64 seq;
2379         u32 snap_count;
2380         struct ceph_snap_context *snapc;
2381         u32 i;
2382
2383         /*
2384          * We'll need room for the seq value (maximum snapshot id),
2385          * snapshot count, and array of that many snapshot ids.
2386          * For now we have a fixed upper limit on the number we're
2387          * prepared to receive.
2388          */
2389         size = sizeof (__le64) + sizeof (__le32) +
2390                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2391         reply_buf = kzalloc(size, GFP_KERNEL);
2392         if (!reply_buf)
2393                 return -ENOMEM;
2394
2395         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2396                                 "rbd", "get_snapcontext",
2397                                 NULL, 0,
2398                                 reply_buf, size,
2399                                 CEPH_OSD_FLAG_READ, ver);
2400         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2401         if (ret < 0)
2402                 goto out;
2403
2404         ret = -ERANGE;
2405         p = reply_buf;
2406         end = (char *) reply_buf + size;
2407         ceph_decode_64_safe(&p, end, seq, out);
2408         ceph_decode_32_safe(&p, end, snap_count, out);
2409
2410         /*
2411          * Make sure the reported number of snapshot ids wouldn't go
2412          * beyond the end of our buffer.  But before checking that,
2413          * make sure the computed size of the snapshot context we
2414          * allocate is representable in a size_t.
2415          */
2416         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2417                                  / sizeof (u64)) {
2418                 ret = -EINVAL;
2419                 goto out;
2420         }
2421         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2422                 goto out;
2423
2424         size = sizeof (struct ceph_snap_context) +
2425                                 snap_count * sizeof (snapc->snaps[0]);
2426         snapc = kmalloc(size, GFP_KERNEL);
2427         if (!snapc) {
2428                 ret = -ENOMEM;
2429                 goto out;
2430         }
2431
2432         atomic_set(&snapc->nref, 1);
2433         snapc->seq = seq;
2434         snapc->num_snaps = snap_count;
2435         for (i = 0; i < snap_count; i++)
2436                 snapc->snaps[i] = ceph_decode_64(&p);
2437
2438         rbd_dev->header.snapc = snapc;
2439
2440         dout("  snap context seq = %llu, snap_count = %u\n",
2441                 (unsigned long long) seq, (unsigned int) snap_count);
2442
2443 out:
2444         kfree(reply_buf);
2445
2446         return 0;
2447 }
2448
2449 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2450 {
2451         size_t size;
2452         void *reply_buf;
2453         __le64 snap_id;
2454         int ret;
2455         void *p;
2456         void *end;
2457         char *snap_name;
2458
2459         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2460         reply_buf = kmalloc(size, GFP_KERNEL);
2461         if (!reply_buf)
2462                 return ERR_PTR(-ENOMEM);
2463
2464         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2465         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2466                                 "rbd", "get_snapshot_name",
2467                                 (char *) &snap_id, sizeof (snap_id),
2468                                 reply_buf, size,
2469                                 CEPH_OSD_FLAG_READ, NULL);
2470         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2471         if (ret < 0)
2472                 goto out;
2473
2474         p = reply_buf;
2475         end = (char *) reply_buf + size;
2476         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2477         if (IS_ERR(snap_name)) {
2478                 ret = PTR_ERR(snap_name);
2479                 goto out;
2480         } else {
2481                 dout("  snap_id 0x%016llx snap_name = %s\n",
2482                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2483         }
2484         kfree(reply_buf);
2485
2486         return snap_name;
2487 out:
2488         kfree(reply_buf);
2489
2490         return ERR_PTR(ret);
2491 }
2492
2493 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2494                 u64 *snap_size, u64 *snap_features)
2495 {
2496         __le64 snap_id;
2497         u8 order;
2498         int ret;
2499
2500         snap_id = rbd_dev->header.snapc->snaps[which];
2501         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2502         if (ret)
2503                 return ERR_PTR(ret);
2504         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2505         if (ret)
2506                 return ERR_PTR(ret);
2507
2508         return rbd_dev_v2_snap_name(rbd_dev, which);
2509 }
2510
2511 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2512                 u64 *snap_size, u64 *snap_features)
2513 {
2514         if (rbd_dev->image_format == 1)
2515                 return rbd_dev_v1_snap_info(rbd_dev, which,
2516                                         snap_size, snap_features);
2517         if (rbd_dev->image_format == 2)
2518                 return rbd_dev_v2_snap_info(rbd_dev, which,
2519                                         snap_size, snap_features);
2520         return ERR_PTR(-EINVAL);
2521 }
2522
2523 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2524 {
2525         int ret;
2526         __u8 obj_order;
2527
2528         down_write(&rbd_dev->header_rwsem);
2529
2530         /* Grab old order first, to see if it changes */
2531
2532         obj_order = rbd_dev->header.obj_order,
2533         ret = rbd_dev_v2_image_size(rbd_dev);
2534         if (ret)
2535                 goto out;
2536         if (rbd_dev->header.obj_order != obj_order) {
2537                 ret = -EIO;
2538                 goto out;
2539         }
2540         rbd_update_mapping_size(rbd_dev);
2541
2542         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2543         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2544         if (ret)
2545                 goto out;
2546         ret = rbd_dev_snaps_update(rbd_dev);
2547         dout("rbd_dev_snaps_update returned %d\n", ret);
2548         if (ret)
2549                 goto out;
2550         ret = rbd_dev_snaps_register(rbd_dev);
2551         dout("rbd_dev_snaps_register returned %d\n", ret);
2552 out:
2553         up_write(&rbd_dev->header_rwsem);
2554
2555         return ret;
2556 }
2557
2558 /*
2559  * Scan the rbd device's current snapshot list and compare it to the
2560  * newly-received snapshot context.  Remove any existing snapshots
2561  * not present in the new snapshot context.  Add a new snapshot for
2562  * any snaphots in the snapshot context not in the current list.
2563  * And verify there are no changes to snapshots we already know
2564  * about.
2565  *
2566  * Assumes the snapshots in the snapshot context are sorted by
2567  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2568  * are also maintained in that order.)
2569  */
2570 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2571 {
2572         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2573         const u32 snap_count = snapc->num_snaps;
2574         struct list_head *head = &rbd_dev->snaps;
2575         struct list_head *links = head->next;
2576         u32 index = 0;
2577
2578         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2579         while (index < snap_count || links != head) {
2580                 u64 snap_id;
2581                 struct rbd_snap *snap;
2582                 char *snap_name;
2583                 u64 snap_size = 0;
2584                 u64 snap_features = 0;
2585
2586                 snap_id = index < snap_count ? snapc->snaps[index]
2587                                              : CEPH_NOSNAP;
2588                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2589                                      : NULL;
2590                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2591
2592                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2593                         struct list_head *next = links->next;
2594
2595                         /* Existing snapshot not in the new snap context */
2596
2597                         if (rbd_dev->spec->snap_id == snap->id)
2598                                 rbd_dev->exists = false;
2599                         rbd_remove_snap_dev(snap);
2600                         dout("%ssnap id %llu has been removed\n",
2601                                 rbd_dev->spec->snap_id == snap->id ?
2602                                                         "mapped " : "",
2603                                 (unsigned long long) snap->id);
2604
2605                         /* Done with this list entry; advance */
2606
2607                         links = next;
2608                         continue;
2609                 }
2610
2611                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2612                                         &snap_size, &snap_features);
2613                 if (IS_ERR(snap_name))
2614                         return PTR_ERR(snap_name);
2615
2616                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2617                         (unsigned long long) snap_id);
2618                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2619                         struct rbd_snap *new_snap;
2620
2621                         /* We haven't seen this snapshot before */
2622
2623                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2624                                         snap_id, snap_size, snap_features);
2625                         if (IS_ERR(new_snap)) {
2626                                 int err = PTR_ERR(new_snap);
2627
2628                                 dout("  failed to add dev, error %d\n", err);
2629
2630                                 return err;
2631                         }
2632
2633                         /* New goes before existing, or at end of list */
2634
2635                         dout("  added dev%s\n", snap ? "" : " at end\n");
2636                         if (snap)
2637                                 list_add_tail(&new_snap->node, &snap->node);
2638                         else
2639                                 list_add_tail(&new_snap->node, head);
2640                 } else {
2641                         /* Already have this one */
2642
2643                         dout("  already present\n");
2644
2645                         rbd_assert(snap->size == snap_size);
2646                         rbd_assert(!strcmp(snap->name, snap_name));
2647                         rbd_assert(snap->features == snap_features);
2648
2649                         /* Done with this list entry; advance */
2650
2651                         links = links->next;
2652                 }
2653
2654                 /* Advance to the next entry in the snapshot context */
2655
2656                 index++;
2657         }
2658         dout("%s: done\n", __func__);
2659
2660         return 0;
2661 }
2662
2663 /*
2664  * Scan the list of snapshots and register the devices for any that
2665  * have not already been registered.
2666  */
2667 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2668 {
2669         struct rbd_snap *snap;
2670         int ret = 0;
2671
2672         dout("%s called\n", __func__);
2673         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2674                 return -EIO;
2675
2676         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2677                 if (!rbd_snap_registered(snap)) {
2678                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2679                         if (ret < 0)
2680                                 break;
2681                 }
2682         }
2683         dout("%s: returning %d\n", __func__, ret);
2684
2685         return ret;
2686 }
2687
2688 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2689 {
2690         struct device *dev;
2691         int ret;
2692
2693         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2694
2695         dev = &rbd_dev->dev;
2696         dev->bus = &rbd_bus_type;
2697         dev->type = &rbd_device_type;
2698         dev->parent = &rbd_root_dev;
2699         dev->release = rbd_dev_release;
2700         dev_set_name(dev, "%d", rbd_dev->dev_id);
2701         ret = device_register(dev);
2702
2703         mutex_unlock(&ctl_mutex);
2704
2705         return ret;
2706 }
2707
2708 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2709 {
2710         device_unregister(&rbd_dev->dev);
2711 }
2712
2713 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2714 {
2715         int ret, rc;
2716
2717         do {
2718                 ret = rbd_req_sync_watch(rbd_dev);
2719                 if (ret == -ERANGE) {
2720                         rc = rbd_dev_refresh(rbd_dev, NULL);
2721                         if (rc < 0)
2722                                 return rc;
2723                 }
2724         } while (ret == -ERANGE);
2725
2726         return ret;
2727 }
2728
2729 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2730
2731 /*
2732  * Get a unique rbd identifier for the given new rbd_dev, and add
2733  * the rbd_dev to the global list.  The minimum rbd id is 1.
2734  */
2735 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2736 {
2737         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2738
2739         spin_lock(&rbd_dev_list_lock);
2740         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2741         spin_unlock(&rbd_dev_list_lock);
2742         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2743                 (unsigned long long) rbd_dev->dev_id);
2744 }
2745
2746 /*
2747  * Remove an rbd_dev from the global list, and record that its
2748  * identifier is no longer in use.
2749  */
2750 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2751 {
2752         struct list_head *tmp;
2753         int rbd_id = rbd_dev->dev_id;
2754         int max_id;
2755
2756         rbd_assert(rbd_id > 0);
2757
2758         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2759                 (unsigned long long) rbd_dev->dev_id);
2760         spin_lock(&rbd_dev_list_lock);
2761         list_del_init(&rbd_dev->node);
2762
2763         /*
2764          * If the id being "put" is not the current maximum, there
2765          * is nothing special we need to do.
2766          */
2767         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2768                 spin_unlock(&rbd_dev_list_lock);
2769                 return;
2770         }
2771
2772         /*
2773          * We need to update the current maximum id.  Search the
2774          * list to find out what it is.  We're more likely to find
2775          * the maximum at the end, so search the list backward.
2776          */
2777         max_id = 0;
2778         list_for_each_prev(tmp, &rbd_dev_list) {
2779                 struct rbd_device *rbd_dev;
2780
2781                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2782                 if (rbd_dev->dev_id > max_id)
2783                         max_id = rbd_dev->dev_id;
2784         }
2785         spin_unlock(&rbd_dev_list_lock);
2786
2787         /*
2788          * The max id could have been updated by rbd_dev_id_get(), in
2789          * which case it now accurately reflects the new maximum.
2790          * Be careful not to overwrite the maximum value in that
2791          * case.
2792          */
2793         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2794         dout("  max dev id has been reset\n");
2795 }
2796
2797 /*
2798  * Skips over white space at *buf, and updates *buf to point to the
2799  * first found non-space character (if any). Returns the length of
2800  * the token (string of non-white space characters) found.  Note
2801  * that *buf must be terminated with '\0'.
2802  */
2803 static inline size_t next_token(const char **buf)
2804 {
2805         /*
2806         * These are the characters that produce nonzero for
2807         * isspace() in the "C" and "POSIX" locales.
2808         */
2809         const char *spaces = " \f\n\r\t\v";
2810
2811         *buf += strspn(*buf, spaces);   /* Find start of token */
2812
2813         return strcspn(*buf, spaces);   /* Return token length */
2814 }
2815
2816 /*
2817  * Finds the next token in *buf, and if the provided token buffer is
2818  * big enough, copies the found token into it.  The result, if
2819  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2820  * must be terminated with '\0' on entry.
2821  *
2822  * Returns the length of the token found (not including the '\0').
2823  * Return value will be 0 if no token is found, and it will be >=
2824  * token_size if the token would not fit.
2825  *
2826  * The *buf pointer will be updated to point beyond the end of the
2827  * found token.  Note that this occurs even if the token buffer is
2828  * too small to hold it.
2829  */
2830 static inline size_t copy_token(const char **buf,
2831                                 char *token,
2832                                 size_t token_size)
2833 {
2834         size_t len;
2835
2836         len = next_token(buf);
2837         if (len < token_size) {
2838                 memcpy(token, *buf, len);
2839                 *(token + len) = '\0';
2840         }
2841         *buf += len;
2842
2843         return len;
2844 }
2845
2846 /*
2847  * Finds the next token in *buf, dynamically allocates a buffer big
2848  * enough to hold a copy of it, and copies the token into the new
2849  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2850  * that a duplicate buffer is created even for a zero-length token.
2851  *
2852  * Returns a pointer to the newly-allocated duplicate, or a null
2853  * pointer if memory for the duplicate was not available.  If
2854  * the lenp argument is a non-null pointer, the length of the token
2855  * (not including the '\0') is returned in *lenp.
2856  *
2857  * If successful, the *buf pointer will be updated to point beyond
2858  * the end of the found token.
2859  *
2860  * Note: uses GFP_KERNEL for allocation.
2861  */
2862 static inline char *dup_token(const char **buf, size_t *lenp)
2863 {
2864         char *dup;
2865         size_t len;
2866
2867         len = next_token(buf);
2868         dup = kmalloc(len + 1, GFP_KERNEL);
2869         if (!dup)
2870                 return NULL;
2871
2872         memcpy(dup, *buf, len);
2873         *(dup + len) = '\0';
2874         *buf += len;
2875
2876         if (lenp)
2877                 *lenp = len;
2878
2879         return dup;
2880 }
2881
2882 /*
2883  * Parse the options provided for an "rbd add" (i.e., rbd image
2884  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
2885  * and the data written is passed here via a NUL-terminated buffer.
2886  * Returns 0 if successful or an error code otherwise.
2887  *
2888  * The information extracted from these options is recorded in
2889  * the other parameters which return dynamically-allocated
2890  * structures:
2891  *  ceph_opts
2892  *      The address of a pointer that will refer to a ceph options
2893  *      structure.  Caller must release the returned pointer using
2894  *      ceph_destroy_options() when it is no longer needed.
2895  *  rbd_opts
2896  *      Address of an rbd options pointer.  Fully initialized by
2897  *      this function; caller must release with kfree().
2898  *  spec
2899  *      Address of an rbd image specification pointer.  Fully
2900  *      initialized by this function based on parsed options.
2901  *      Caller must release with rbd_spec_put().
2902  *
2903  * The options passed take this form:
2904  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
2905  * where:
2906  *  <mon_addrs>
2907  *      A comma-separated list of one or more monitor addresses.
2908  *      A monitor address is an ip address, optionally followed
2909  *      by a port number (separated by a colon).
2910  *        I.e.:  ip1[:port1][,ip2[:port2]...]
2911  *  <options>
2912  *      A comma-separated list of ceph and/or rbd options.
2913  *  <pool_name>
2914  *      The name of the rados pool containing the rbd image.
2915  *  <image_name>
2916  *      The name of the image in that pool to map.
2917  *  <snap_id>
2918  *      An optional snapshot id.  If provided, the mapping will
2919  *      present data from the image at the time that snapshot was
2920  *      created.  The image head is used if no snapshot id is
2921  *      provided.  Snapshot mappings are always read-only.
2922  */
2923 static int rbd_add_parse_args(const char *buf,
2924                                 struct ceph_options **ceph_opts,
2925                                 struct rbd_options **opts,
2926                                 struct rbd_spec **rbd_spec)
2927 {
2928         size_t len;
2929         char *options;
2930         const char *mon_addrs;
2931         size_t mon_addrs_size;
2932         struct rbd_spec *spec = NULL;
2933         struct rbd_options *rbd_opts = NULL;
2934         struct ceph_options *copts;
2935         int ret;
2936
2937         /* The first four tokens are required */
2938
2939         len = next_token(&buf);
2940         if (!len)
2941                 return -EINVAL; /* Missing monitor address(es) */
2942         mon_addrs = buf;
2943         mon_addrs_size = len + 1;
2944         buf += len;
2945
2946         ret = -EINVAL;
2947         options = dup_token(&buf, NULL);
2948         if (!options)
2949                 return -ENOMEM;
2950         if (!*options)
2951                 goto out_err;   /* Missing options */
2952
2953         spec = rbd_spec_alloc();
2954         if (!spec)
2955                 goto out_mem;
2956
2957         spec->pool_name = dup_token(&buf, NULL);
2958         if (!spec->pool_name)
2959                 goto out_mem;
2960         if (!*spec->pool_name)
2961                 goto out_err;   /* Missing pool name */
2962
2963         spec->image_name = dup_token(&buf, &spec->image_name_len);
2964         if (!spec->image_name)
2965                 goto out_mem;
2966         if (!*spec->image_name)
2967                 goto out_err;   /* Missing image name */
2968
2969         /*
2970          * Snapshot name is optional; default is to use "-"
2971          * (indicating the head/no snapshot).
2972          */
2973         len = next_token(&buf);
2974         if (!len) {
2975                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2976                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2977         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
2978                 ret = -ENAMETOOLONG;
2979                 goto out_err;
2980         }
2981         spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
2982         if (!spec->snap_name)
2983                 goto out_mem;
2984         memcpy(spec->snap_name, buf, len);
2985         *(spec->snap_name + len) = '\0';
2986
2987         /* Initialize all rbd options to the defaults */
2988
2989         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
2990         if (!rbd_opts)
2991                 goto out_mem;
2992
2993         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
2994
2995         copts = ceph_parse_options(options, mon_addrs,
2996                                         mon_addrs + mon_addrs_size - 1,
2997                                         parse_rbd_opts_token, rbd_opts);
2998         if (IS_ERR(copts)) {
2999                 ret = PTR_ERR(copts);
3000                 goto out_err;
3001         }
3002         kfree(options);
3003
3004         *ceph_opts = copts;
3005         *opts = rbd_opts;
3006         *rbd_spec = spec;
3007
3008         return 0;
3009 out_mem:
3010         ret = -ENOMEM;
3011 out_err:
3012         kfree(rbd_opts);
3013         rbd_spec_put(spec);
3014         kfree(options);
3015
3016         return ret;
3017 }
3018
3019 /*
3020  * An rbd format 2 image has a unique identifier, distinct from the
3021  * name given to it by the user.  Internally, that identifier is
3022  * what's used to specify the names of objects related to the image.
3023  *
3024  * A special "rbd id" object is used to map an rbd image name to its
3025  * id.  If that object doesn't exist, then there is no v2 rbd image
3026  * with the supplied name.
3027  *
3028  * This function will record the given rbd_dev's image_id field if
3029  * it can be determined, and in that case will return 0.  If any
3030  * errors occur a negative errno will be returned and the rbd_dev's
3031  * image_id field will be unchanged (and should be NULL).
3032  */
3033 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3034 {
3035         int ret;
3036         size_t size;
3037         char *object_name;
3038         void *response;
3039         void *p;
3040
3041         /*
3042          * First, see if the format 2 image id file exists, and if
3043          * so, get the image's persistent id from it.
3044          */
3045         size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
3046         object_name = kmalloc(size, GFP_NOIO);
3047         if (!object_name)
3048                 return -ENOMEM;
3049         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3050         dout("rbd id object name is %s\n", object_name);
3051
3052         /* Response will be an encoded string, which includes a length */
3053
3054         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3055         response = kzalloc(size, GFP_NOIO);
3056         if (!response) {
3057                 ret = -ENOMEM;
3058                 goto out;
3059         }
3060
3061         ret = rbd_req_sync_exec(rbd_dev, object_name,
3062                                 "rbd", "get_id",
3063                                 NULL, 0,
3064                                 response, RBD_IMAGE_ID_LEN_MAX,
3065                                 CEPH_OSD_FLAG_READ, NULL);
3066         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3067         if (ret < 0)
3068                 goto out;
3069         ret = 0;    /* rbd_req_sync_exec() can return positive */
3070
3071         p = response;
3072         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3073                                                 p + RBD_IMAGE_ID_LEN_MAX,
3074                                                 &rbd_dev->spec->image_id_len,
3075                                                 GFP_NOIO);
3076         if (IS_ERR(rbd_dev->spec->image_id)) {
3077                 ret = PTR_ERR(rbd_dev->spec->image_id);
3078                 rbd_dev->spec->image_id = NULL;
3079         } else {
3080                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3081         }
3082 out:
3083         kfree(response);
3084         kfree(object_name);
3085
3086         return ret;
3087 }
3088
3089 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3090 {
3091         int ret;
3092         size_t size;
3093
3094         /* Version 1 images have no id; empty string is used */
3095
3096         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3097         if (!rbd_dev->spec->image_id)
3098                 return -ENOMEM;
3099         rbd_dev->spec->image_id_len = 0;
3100
3101         /* Record the header object name for this rbd image. */
3102
3103         size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
3104         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3105         if (!rbd_dev->header_name) {
3106                 ret = -ENOMEM;
3107                 goto out_err;
3108         }
3109         sprintf(rbd_dev->header_name, "%s%s",
3110                 rbd_dev->spec->image_name, RBD_SUFFIX);
3111
3112         /* Populate rbd image metadata */
3113
3114         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3115         if (ret < 0)
3116                 goto out_err;
3117         rbd_dev->image_format = 1;
3118
3119         dout("discovered version 1 image, header name is %s\n",
3120                 rbd_dev->header_name);
3121
3122         return 0;
3123
3124 out_err:
3125         kfree(rbd_dev->header_name);
3126         rbd_dev->header_name = NULL;
3127         kfree(rbd_dev->spec->image_id);
3128         rbd_dev->spec->image_id = NULL;
3129
3130         return ret;
3131 }
3132
3133 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3134 {
3135         size_t size;
3136         int ret;
3137         u64 ver = 0;
3138
3139         /*
3140          * Image id was filled in by the caller.  Record the header
3141          * object name for this rbd image.
3142          */
3143         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
3144         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3145         if (!rbd_dev->header_name)
3146                 return -ENOMEM;
3147         sprintf(rbd_dev->header_name, "%s%s",
3148                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3149
3150         /* Get the size and object order for the image */
3151
3152         ret = rbd_dev_v2_image_size(rbd_dev);
3153         if (ret < 0)
3154                 goto out_err;
3155
3156         /* Get the object prefix (a.k.a. block_name) for the image */
3157
3158         ret = rbd_dev_v2_object_prefix(rbd_dev);
3159         if (ret < 0)
3160                 goto out_err;
3161
3162         /* Get the and check features for the image */
3163
3164         ret = rbd_dev_v2_features(rbd_dev);
3165         if (ret < 0)
3166                 goto out_err;
3167
3168         /* crypto and compression type aren't (yet) supported for v2 images */
3169
3170         rbd_dev->header.crypt_type = 0;
3171         rbd_dev->header.comp_type = 0;
3172
3173         /* Get the snapshot context, plus the header version */
3174
3175         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3176         if (ret)
3177                 goto out_err;
3178         rbd_dev->header.obj_version = ver;
3179
3180         rbd_dev->image_format = 2;
3181
3182         dout("discovered version 2 image, header name is %s\n",
3183                 rbd_dev->header_name);
3184
3185         return 0;
3186 out_err:
3187         kfree(rbd_dev->header_name);
3188         rbd_dev->header_name = NULL;
3189         kfree(rbd_dev->header.object_prefix);
3190         rbd_dev->header.object_prefix = NULL;
3191
3192         return ret;
3193 }
3194
3195 /*
3196  * Probe for the existence of the header object for the given rbd
3197  * device.  For format 2 images this includes determining the image
3198  * id.
3199  */
3200 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3201 {
3202         int ret;
3203
3204         /*
3205          * Get the id from the image id object.  If it's not a
3206          * format 2 image, we'll get ENOENT back, and we'll assume
3207          * it's a format 1 image.
3208          */
3209         ret = rbd_dev_image_id(rbd_dev);
3210         if (ret)
3211                 ret = rbd_dev_v1_probe(rbd_dev);
3212         else
3213                 ret = rbd_dev_v2_probe(rbd_dev);
3214         if (ret)
3215                 dout("probe failed, returning %d\n", ret);
3216
3217         return ret;
3218 }
3219
3220 static ssize_t rbd_add(struct bus_type *bus,
3221                        const char *buf,
3222                        size_t count)
3223 {
3224         struct rbd_device *rbd_dev = NULL;
3225         struct ceph_options *ceph_opts = NULL;
3226         struct rbd_options *rbd_opts = NULL;
3227         struct rbd_spec *spec = NULL;
3228         struct rbd_client *rbdc;
3229         struct ceph_osd_client *osdc;
3230         int rc = -ENOMEM;
3231
3232         if (!try_module_get(THIS_MODULE))
3233                 return -ENODEV;
3234
3235         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3236         if (!rbd_dev)
3237                 return -ENOMEM;
3238
3239         /* static rbd_device initialization */
3240         spin_lock_init(&rbd_dev->lock);
3241         INIT_LIST_HEAD(&rbd_dev->node);
3242         INIT_LIST_HEAD(&rbd_dev->snaps);
3243         init_rwsem(&rbd_dev->header_rwsem);
3244
3245         /* parse add command */
3246         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3247         if (rc < 0)
3248                 goto err_out_mem;
3249
3250         rbd_dev->mapping.read_only = rbd_opts->read_only;
3251
3252         rbdc = rbd_get_client(ceph_opts);
3253         if (IS_ERR(rbdc)) {
3254                 rc = PTR_ERR(rbdc);
3255                 goto err_out_args;
3256         }
3257         rbd_dev->rbd_client = rbdc;
3258         ceph_opts = NULL;       /* ceph_opts now owned by rbd_dev client */
3259
3260         /* pick the pool */
3261         osdc = &rbdc->client->osdc;
3262         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3263         if (rc < 0)
3264                 goto err_out_client;
3265         spec->pool_id = (u64) rc;
3266
3267         rbd_dev->spec = spec;
3268
3269         rc = rbd_dev_probe(rbd_dev);
3270         if (rc < 0)
3271                 goto err_out_client;
3272
3273         /* no need to lock here, as rbd_dev is not registered yet */
3274         rc = rbd_dev_snaps_update(rbd_dev);
3275         if (rc)
3276                 goto err_out_probe;
3277
3278         rc = rbd_dev_set_mapping(rbd_dev);
3279         if (rc)
3280                 goto err_out_snaps;
3281
3282         /* generate unique id: find highest unique id, add one */
3283         rbd_dev_id_get(rbd_dev);
3284
3285         /* Fill in the device name, now that we have its id. */
3286         BUILD_BUG_ON(DEV_NAME_LEN
3287                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3288         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3289
3290         /* Get our block major device number. */
3291
3292         rc = register_blkdev(0, rbd_dev->name);
3293         if (rc < 0)
3294                 goto err_out_id;
3295         rbd_dev->major = rc;
3296
3297         /* Set up the blkdev mapping. */
3298
3299         rc = rbd_init_disk(rbd_dev);
3300         if (rc)
3301                 goto err_out_blkdev;
3302
3303         rc = rbd_bus_add_dev(rbd_dev);
3304         if (rc)
3305                 goto err_out_disk;
3306
3307         /*
3308          * At this point cleanup in the event of an error is the job
3309          * of the sysfs code (initiated by rbd_bus_del_dev()).
3310          */
3311
3312         down_write(&rbd_dev->header_rwsem);
3313         rc = rbd_dev_snaps_register(rbd_dev);
3314         up_write(&rbd_dev->header_rwsem);
3315         if (rc)
3316                 goto err_out_bus;
3317
3318         rc = rbd_init_watch_dev(rbd_dev);
3319         if (rc)
3320                 goto err_out_bus;
3321
3322         kfree(rbd_opts);
3323
3324         /* Everything's ready.  Announce the disk to the world. */
3325
3326         add_disk(rbd_dev->disk);
3327
3328         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3329                 (unsigned long long) rbd_dev->mapping.size);
3330
3331         return count;
3332
3333 err_out_bus:
3334         /* this will also clean up rest of rbd_dev stuff */
3335
3336         rbd_bus_del_dev(rbd_dev);
3337         kfree(rbd_opts);
3338
3339         return rc;
3340
3341 err_out_disk:
3342         rbd_free_disk(rbd_dev);
3343 err_out_blkdev:
3344         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3345 err_out_id:
3346         rbd_dev_id_put(rbd_dev);
3347 err_out_snaps:
3348         rbd_remove_all_snaps(rbd_dev);
3349 err_out_probe:
3350         rbd_header_free(&rbd_dev->header);
3351 err_out_client:
3352         kfree(rbd_dev->header_name);
3353         rbd_put_client(rbdc);
3354 err_out_args:
3355         if (ceph_opts)
3356                 ceph_destroy_options(ceph_opts);
3357         kfree(rbd_opts);
3358         rbd_spec_put(spec);
3359 err_out_mem:
3360         kfree(rbd_dev);
3361
3362         dout("Error adding device %s\n", buf);
3363         module_put(THIS_MODULE);
3364
3365         return (ssize_t) rc;
3366 }
3367
3368 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3369 {
3370         struct list_head *tmp;
3371         struct rbd_device *rbd_dev;
3372
3373         spin_lock(&rbd_dev_list_lock);
3374         list_for_each(tmp, &rbd_dev_list) {
3375                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3376                 if (rbd_dev->dev_id == dev_id) {
3377                         spin_unlock(&rbd_dev_list_lock);
3378                         return rbd_dev;
3379                 }
3380         }
3381         spin_unlock(&rbd_dev_list_lock);
3382         return NULL;
3383 }
3384
3385 static void rbd_dev_release(struct device *dev)
3386 {
3387         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3388
3389         if (rbd_dev->watch_request) {
3390                 struct ceph_client *client = rbd_dev->rbd_client->client;
3391
3392                 ceph_osdc_unregister_linger_request(&client->osdc,
3393                                                     rbd_dev->watch_request);
3394         }
3395         if (rbd_dev->watch_event)
3396                 rbd_req_sync_unwatch(rbd_dev);
3397
3398         rbd_put_client(rbd_dev->rbd_client);
3399
3400         /* clean up and free blkdev */
3401         rbd_free_disk(rbd_dev);
3402         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3403
3404         /* release allocated disk header fields */
3405         rbd_header_free(&rbd_dev->header);
3406
3407         /* done with the id, and with the rbd_dev */
3408         kfree(rbd_dev->header_name);
3409         rbd_dev_id_put(rbd_dev);
3410         rbd_spec_put(rbd_dev->spec);
3411         kfree(rbd_dev);
3412
3413         /* release module ref */
3414         module_put(THIS_MODULE);
3415 }
3416
3417 static ssize_t rbd_remove(struct bus_type *bus,
3418                           const char *buf,
3419                           size_t count)
3420 {
3421         struct rbd_device *rbd_dev = NULL;
3422         int target_id, rc;
3423         unsigned long ul;
3424         int ret = count;
3425
3426         rc = strict_strtoul(buf, 10, &ul);
3427         if (rc)
3428                 return rc;
3429
3430         /* convert to int; abort if we lost anything in the conversion */
3431         target_id = (int) ul;
3432         if (target_id != ul)
3433                 return -EINVAL;
3434
3435         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3436
3437         rbd_dev = __rbd_get_dev(target_id);
3438         if (!rbd_dev) {
3439                 ret = -ENOENT;
3440                 goto done;
3441         }
3442
3443         rbd_remove_all_snaps(rbd_dev);
3444         rbd_bus_del_dev(rbd_dev);
3445
3446 done:
3447         mutex_unlock(&ctl_mutex);
3448
3449         return ret;
3450 }
3451
3452 /*
3453  * create control files in sysfs
3454  * /sys/bus/rbd/...
3455  */
3456 static int rbd_sysfs_init(void)
3457 {
3458         int ret;
3459
3460         ret = device_register(&rbd_root_dev);
3461         if (ret < 0)
3462                 return ret;
3463
3464         ret = bus_register(&rbd_bus_type);
3465         if (ret < 0)
3466                 device_unregister(&rbd_root_dev);
3467
3468         return ret;
3469 }
3470
3471 static void rbd_sysfs_cleanup(void)
3472 {
3473         bus_unregister(&rbd_bus_type);
3474         device_unregister(&rbd_root_dev);
3475 }
3476
3477 int __init rbd_init(void)
3478 {
3479         int rc;
3480
3481         rc = rbd_sysfs_init();
3482         if (rc)
3483                 return rc;
3484         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3485         return 0;
3486 }
3487
3488 void __exit rbd_exit(void)
3489 {
3490         rbd_sysfs_cleanup();
3491 }
3492
3493 module_init(rbd_init);
3494 module_exit(rbd_exit);
3495
3496 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3497 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3498 MODULE_DESCRIPTION("rados block device");
3499
3500 /* following authorship retained from original osdblk.c */
3501 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3502
3503 MODULE_LICENSE("GPL");