]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: add reference counting to rbd_spec
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 /*
116  * An rbd image specification.
117  *
118  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
119  * identify an image.
120  */
121 struct rbd_spec {
122         u64             pool_id;
123         char            *pool_name;
124
125         char            *image_id;
126         size_t          image_id_len;
127         char            *image_name;
128         size_t          image_name_len;
129
130         u64             snap_id;
131         char            *snap_name;
132
133         struct kref     kref;
134 };
135
136 struct rbd_options {
137         bool    read_only;
138 };
139
140 /*
141  * an instance of the client.  multiple devices may share an rbd client.
142  */
143 struct rbd_client {
144         struct ceph_client      *client;
145         struct kref             kref;
146         struct list_head        node;
147 };
148
149 /*
150  * a request completion status
151  */
152 struct rbd_req_status {
153         int done;
154         int rc;
155         u64 bytes;
156 };
157
158 /*
159  * a collection of requests
160  */
161 struct rbd_req_coll {
162         int                     total;
163         int                     num_done;
164         struct kref             kref;
165         struct rbd_req_status   status[0];
166 };
167
168 /*
169  * a single io request
170  */
171 struct rbd_request {
172         struct request          *rq;            /* blk layer request */
173         struct bio              *bio;           /* cloned bio */
174         struct page             **pages;        /* list of used pages */
175         u64                     len;
176         int                     coll_index;
177         struct rbd_req_coll     *coll;
178 };
179
180 struct rbd_snap {
181         struct  device          dev;
182         const char              *name;
183         u64                     size;
184         struct list_head        node;
185         u64                     id;
186         u64                     features;
187 };
188
189 struct rbd_mapping {
190         u64                     size;
191         u64                     features;
192         bool                    read_only;
193 };
194
195 /*
196  * a single device
197  */
198 struct rbd_device {
199         int                     dev_id;         /* blkdev unique id */
200
201         int                     major;          /* blkdev assigned major */
202         struct gendisk          *disk;          /* blkdev's gendisk and rq */
203
204         u32                     image_format;   /* Either 1 or 2 */
205         struct rbd_client       *rbd_client;
206
207         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
208
209         spinlock_t              lock;           /* queue lock */
210
211         struct rbd_image_header header;
212         bool                    exists;
213         struct rbd_spec         *spec;
214
215         char                    *header_name;
216
217         struct ceph_osd_event   *watch_event;
218         struct ceph_osd_request *watch_request;
219
220         /* protects updating the header */
221         struct rw_semaphore     header_rwsem;
222
223         struct rbd_mapping      mapping;
224
225         struct list_head        node;
226
227         /* list of snapshots */
228         struct list_head        snaps;
229
230         /* sysfs related */
231         struct device           dev;
232 };
233
234 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
235
236 static LIST_HEAD(rbd_dev_list);    /* devices */
237 static DEFINE_SPINLOCK(rbd_dev_list_lock);
238
239 static LIST_HEAD(rbd_client_list);              /* clients */
240 static DEFINE_SPINLOCK(rbd_client_list_lock);
241
242 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
243 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
244
245 static void rbd_dev_release(struct device *dev);
246 static void rbd_remove_snap_dev(struct rbd_snap *snap);
247
248 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
249                        size_t count);
250 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
251                           size_t count);
252
253 static struct bus_attribute rbd_bus_attrs[] = {
254         __ATTR(add, S_IWUSR, NULL, rbd_add),
255         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
256         __ATTR_NULL
257 };
258
259 static struct bus_type rbd_bus_type = {
260         .name           = "rbd",
261         .bus_attrs      = rbd_bus_attrs,
262 };
263
264 static void rbd_root_dev_release(struct device *dev)
265 {
266 }
267
268 static struct device rbd_root_dev = {
269         .init_name =    "rbd",
270         .release =      rbd_root_dev_release,
271 };
272
273 #ifdef RBD_DEBUG
274 #define rbd_assert(expr)                                                \
275                 if (unlikely(!(expr))) {                                \
276                         printk(KERN_ERR "\nAssertion failure in %s() "  \
277                                                 "at line %d:\n\n"       \
278                                         "\trbd_assert(%s);\n\n",        \
279                                         __func__, __LINE__, #expr);     \
280                         BUG();                                          \
281                 }
282 #else /* !RBD_DEBUG */
283 #  define rbd_assert(expr)      ((void) 0)
284 #endif /* !RBD_DEBUG */
285
286 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
287 {
288         return get_device(&rbd_dev->dev);
289 }
290
291 static void rbd_put_dev(struct rbd_device *rbd_dev)
292 {
293         put_device(&rbd_dev->dev);
294 }
295
296 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
297 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
298
299 static int rbd_open(struct block_device *bdev, fmode_t mode)
300 {
301         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
302
303         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
304                 return -EROFS;
305
306         rbd_get_dev(rbd_dev);
307         set_device_ro(bdev, rbd_dev->mapping.read_only);
308
309         return 0;
310 }
311
312 static int rbd_release(struct gendisk *disk, fmode_t mode)
313 {
314         struct rbd_device *rbd_dev = disk->private_data;
315
316         rbd_put_dev(rbd_dev);
317
318         return 0;
319 }
320
321 static const struct block_device_operations rbd_bd_ops = {
322         .owner                  = THIS_MODULE,
323         .open                   = rbd_open,
324         .release                = rbd_release,
325 };
326
327 /*
328  * Initialize an rbd client instance.
329  * We own *ceph_opts.
330  */
331 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
332 {
333         struct rbd_client *rbdc;
334         int ret = -ENOMEM;
335
336         dout("rbd_client_create\n");
337         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
338         if (!rbdc)
339                 goto out_opt;
340
341         kref_init(&rbdc->kref);
342         INIT_LIST_HEAD(&rbdc->node);
343
344         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
345
346         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
347         if (IS_ERR(rbdc->client))
348                 goto out_mutex;
349         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
350
351         ret = ceph_open_session(rbdc->client);
352         if (ret < 0)
353                 goto out_err;
354
355         spin_lock(&rbd_client_list_lock);
356         list_add_tail(&rbdc->node, &rbd_client_list);
357         spin_unlock(&rbd_client_list_lock);
358
359         mutex_unlock(&ctl_mutex);
360
361         dout("rbd_client_create created %p\n", rbdc);
362         return rbdc;
363
364 out_err:
365         ceph_destroy_client(rbdc->client);
366 out_mutex:
367         mutex_unlock(&ctl_mutex);
368         kfree(rbdc);
369 out_opt:
370         if (ceph_opts)
371                 ceph_destroy_options(ceph_opts);
372         return ERR_PTR(ret);
373 }
374
375 /*
376  * Find a ceph client with specific addr and configuration.  If
377  * found, bump its reference count.
378  */
379 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
380 {
381         struct rbd_client *client_node;
382         bool found = false;
383
384         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
385                 return NULL;
386
387         spin_lock(&rbd_client_list_lock);
388         list_for_each_entry(client_node, &rbd_client_list, node) {
389                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
390                         kref_get(&client_node->kref);
391                         found = true;
392                         break;
393                 }
394         }
395         spin_unlock(&rbd_client_list_lock);
396
397         return found ? client_node : NULL;
398 }
399
400 /*
401  * mount options
402  */
403 enum {
404         Opt_last_int,
405         /* int args above */
406         Opt_last_string,
407         /* string args above */
408         Opt_read_only,
409         Opt_read_write,
410         /* Boolean args above */
411         Opt_last_bool,
412 };
413
414 static match_table_t rbd_opts_tokens = {
415         /* int args above */
416         /* string args above */
417         {Opt_read_only, "read_only"},
418         {Opt_read_only, "ro"},          /* Alternate spelling */
419         {Opt_read_write, "read_write"},
420         {Opt_read_write, "rw"},         /* Alternate spelling */
421         /* Boolean args above */
422         {-1, NULL}
423 };
424
425 static int parse_rbd_opts_token(char *c, void *private)
426 {
427         struct rbd_options *rbd_opts = private;
428         substring_t argstr[MAX_OPT_ARGS];
429         int token, intval, ret;
430
431         token = match_token(c, rbd_opts_tokens, argstr);
432         if (token < 0)
433                 return -EINVAL;
434
435         if (token < Opt_last_int) {
436                 ret = match_int(&argstr[0], &intval);
437                 if (ret < 0) {
438                         pr_err("bad mount option arg (not int) "
439                                "at '%s'\n", c);
440                         return ret;
441                 }
442                 dout("got int token %d val %d\n", token, intval);
443         } else if (token > Opt_last_int && token < Opt_last_string) {
444                 dout("got string token %d val %s\n", token,
445                      argstr[0].from);
446         } else if (token > Opt_last_string && token < Opt_last_bool) {
447                 dout("got Boolean token %d\n", token);
448         } else {
449                 dout("got token %d\n", token);
450         }
451
452         switch (token) {
453         case Opt_read_only:
454                 rbd_opts->read_only = true;
455                 break;
456         case Opt_read_write:
457                 rbd_opts->read_only = false;
458                 break;
459         default:
460                 rbd_assert(false);
461                 break;
462         }
463         return 0;
464 }
465
466 /*
467  * Get a ceph client with specific addr and configuration, if one does
468  * not exist create it.
469  */
470 static int rbd_get_client(struct rbd_device *rbd_dev,
471                                 struct ceph_options *ceph_opts)
472 {
473         struct rbd_client *rbdc;
474
475         rbdc = rbd_client_find(ceph_opts);
476         if (rbdc) {
477                 /* using an existing client */
478                 ceph_destroy_options(ceph_opts);
479         } else {
480                 rbdc = rbd_client_create(ceph_opts);
481                 if (IS_ERR(rbdc))
482                         return PTR_ERR(rbdc);
483         }
484         rbd_dev->rbd_client = rbdc;
485
486         return 0;
487 }
488
489 /*
490  * Destroy ceph client
491  *
492  * Caller must hold rbd_client_list_lock.
493  */
494 static void rbd_client_release(struct kref *kref)
495 {
496         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
497
498         dout("rbd_release_client %p\n", rbdc);
499         spin_lock(&rbd_client_list_lock);
500         list_del(&rbdc->node);
501         spin_unlock(&rbd_client_list_lock);
502
503         ceph_destroy_client(rbdc->client);
504         kfree(rbdc);
505 }
506
507 /*
508  * Drop reference to ceph client node. If it's not referenced anymore, release
509  * it.
510  */
511 static void rbd_put_client(struct rbd_device *rbd_dev)
512 {
513         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
514         rbd_dev->rbd_client = NULL;
515 }
516
517 /*
518  * Destroy requests collection
519  */
520 static void rbd_coll_release(struct kref *kref)
521 {
522         struct rbd_req_coll *coll =
523                 container_of(kref, struct rbd_req_coll, kref);
524
525         dout("rbd_coll_release %p\n", coll);
526         kfree(coll);
527 }
528
529 static bool rbd_image_format_valid(u32 image_format)
530 {
531         return image_format == 1 || image_format == 2;
532 }
533
534 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
535 {
536         size_t size;
537         u32 snap_count;
538
539         /* The header has to start with the magic rbd header text */
540         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
541                 return false;
542
543         /* The bio layer requires at least sector-sized I/O */
544
545         if (ondisk->options.order < SECTOR_SHIFT)
546                 return false;
547
548         /* If we use u64 in a few spots we may be able to loosen this */
549
550         if (ondisk->options.order > 8 * sizeof (int) - 1)
551                 return false;
552
553         /*
554          * The size of a snapshot header has to fit in a size_t, and
555          * that limits the number of snapshots.
556          */
557         snap_count = le32_to_cpu(ondisk->snap_count);
558         size = SIZE_MAX - sizeof (struct ceph_snap_context);
559         if (snap_count > size / sizeof (__le64))
560                 return false;
561
562         /*
563          * Not only that, but the size of the entire the snapshot
564          * header must also be representable in a size_t.
565          */
566         size -= snap_count * sizeof (__le64);
567         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
568                 return false;
569
570         return true;
571 }
572
573 /*
574  * Create a new header structure, translate header format from the on-disk
575  * header.
576  */
577 static int rbd_header_from_disk(struct rbd_image_header *header,
578                                  struct rbd_image_header_ondisk *ondisk)
579 {
580         u32 snap_count;
581         size_t len;
582         size_t size;
583         u32 i;
584
585         memset(header, 0, sizeof (*header));
586
587         snap_count = le32_to_cpu(ondisk->snap_count);
588
589         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
590         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
591         if (!header->object_prefix)
592                 return -ENOMEM;
593         memcpy(header->object_prefix, ondisk->object_prefix, len);
594         header->object_prefix[len] = '\0';
595
596         if (snap_count) {
597                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
598
599                 /* Save a copy of the snapshot names */
600
601                 if (snap_names_len > (u64) SIZE_MAX)
602                         return -EIO;
603                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
604                 if (!header->snap_names)
605                         goto out_err;
606                 /*
607                  * Note that rbd_dev_v1_header_read() guarantees
608                  * the ondisk buffer we're working with has
609                  * snap_names_len bytes beyond the end of the
610                  * snapshot id array, this memcpy() is safe.
611                  */
612                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
613                         snap_names_len);
614
615                 /* Record each snapshot's size */
616
617                 size = snap_count * sizeof (*header->snap_sizes);
618                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
619                 if (!header->snap_sizes)
620                         goto out_err;
621                 for (i = 0; i < snap_count; i++)
622                         header->snap_sizes[i] =
623                                 le64_to_cpu(ondisk->snaps[i].image_size);
624         } else {
625                 WARN_ON(ondisk->snap_names_len);
626                 header->snap_names = NULL;
627                 header->snap_sizes = NULL;
628         }
629
630         header->features = 0;   /* No features support in v1 images */
631         header->obj_order = ondisk->options.order;
632         header->crypt_type = ondisk->options.crypt_type;
633         header->comp_type = ondisk->options.comp_type;
634
635         /* Allocate and fill in the snapshot context */
636
637         header->image_size = le64_to_cpu(ondisk->image_size);
638         size = sizeof (struct ceph_snap_context);
639         size += snap_count * sizeof (header->snapc->snaps[0]);
640         header->snapc = kzalloc(size, GFP_KERNEL);
641         if (!header->snapc)
642                 goto out_err;
643
644         atomic_set(&header->snapc->nref, 1);
645         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
646         header->snapc->num_snaps = snap_count;
647         for (i = 0; i < snap_count; i++)
648                 header->snapc->snaps[i] =
649                         le64_to_cpu(ondisk->snaps[i].id);
650
651         return 0;
652
653 out_err:
654         kfree(header->snap_sizes);
655         header->snap_sizes = NULL;
656         kfree(header->snap_names);
657         header->snap_names = NULL;
658         kfree(header->object_prefix);
659         header->object_prefix = NULL;
660
661         return -ENOMEM;
662 }
663
664 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
665 {
666
667         struct rbd_snap *snap;
668
669         list_for_each_entry(snap, &rbd_dev->snaps, node) {
670                 if (!strcmp(snap_name, snap->name)) {
671                         rbd_dev->spec->snap_id = snap->id;
672                         rbd_dev->mapping.size = snap->size;
673                         rbd_dev->mapping.features = snap->features;
674
675                         return 0;
676                 }
677         }
678
679         return -ENOENT;
680 }
681
682 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
683 {
684         int ret;
685
686         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
687                     sizeof (RBD_SNAP_HEAD_NAME))) {
688                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
689                 rbd_dev->mapping.size = rbd_dev->header.image_size;
690                 rbd_dev->mapping.features = rbd_dev->header.features;
691                 ret = 0;
692         } else {
693                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
694                 if (ret < 0)
695                         goto done;
696                 rbd_dev->mapping.read_only = true;
697         }
698         rbd_dev->exists = true;
699 done:
700         return ret;
701 }
702
703 static void rbd_header_free(struct rbd_image_header *header)
704 {
705         kfree(header->object_prefix);
706         header->object_prefix = NULL;
707         kfree(header->snap_sizes);
708         header->snap_sizes = NULL;
709         kfree(header->snap_names);
710         header->snap_names = NULL;
711         ceph_put_snap_context(header->snapc);
712         header->snapc = NULL;
713 }
714
715 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
716 {
717         char *name;
718         u64 segment;
719         int ret;
720
721         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
722         if (!name)
723                 return NULL;
724         segment = offset >> rbd_dev->header.obj_order;
725         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
726                         rbd_dev->header.object_prefix, segment);
727         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
728                 pr_err("error formatting segment name for #%llu (%d)\n",
729                         segment, ret);
730                 kfree(name);
731                 name = NULL;
732         }
733
734         return name;
735 }
736
737 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
738 {
739         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
740
741         return offset & (segment_size - 1);
742 }
743
744 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
745                                 u64 offset, u64 length)
746 {
747         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
748
749         offset &= segment_size - 1;
750
751         rbd_assert(length <= U64_MAX - offset);
752         if (offset + length > segment_size)
753                 length = segment_size - offset;
754
755         return length;
756 }
757
758 static int rbd_get_num_segments(struct rbd_image_header *header,
759                                 u64 ofs, u64 len)
760 {
761         u64 start_seg;
762         u64 end_seg;
763
764         if (!len)
765                 return 0;
766         if (len - 1 > U64_MAX - ofs)
767                 return -ERANGE;
768
769         start_seg = ofs >> header->obj_order;
770         end_seg = (ofs + len - 1) >> header->obj_order;
771
772         return end_seg - start_seg + 1;
773 }
774
775 /*
776  * returns the size of an object in the image
777  */
778 static u64 rbd_obj_bytes(struct rbd_image_header *header)
779 {
780         return 1 << header->obj_order;
781 }
782
783 /*
784  * bio helpers
785  */
786
787 static void bio_chain_put(struct bio *chain)
788 {
789         struct bio *tmp;
790
791         while (chain) {
792                 tmp = chain;
793                 chain = chain->bi_next;
794                 bio_put(tmp);
795         }
796 }
797
798 /*
799  * zeros a bio chain, starting at specific offset
800  */
801 static void zero_bio_chain(struct bio *chain, int start_ofs)
802 {
803         struct bio_vec *bv;
804         unsigned long flags;
805         void *buf;
806         int i;
807         int pos = 0;
808
809         while (chain) {
810                 bio_for_each_segment(bv, chain, i) {
811                         if (pos + bv->bv_len > start_ofs) {
812                                 int remainder = max(start_ofs - pos, 0);
813                                 buf = bvec_kmap_irq(bv, &flags);
814                                 memset(buf + remainder, 0,
815                                        bv->bv_len - remainder);
816                                 bvec_kunmap_irq(buf, &flags);
817                         }
818                         pos += bv->bv_len;
819                 }
820
821                 chain = chain->bi_next;
822         }
823 }
824
825 /*
826  * Clone a portion of a bio, starting at the given byte offset
827  * and continuing for the number of bytes indicated.
828  */
829 static struct bio *bio_clone_range(struct bio *bio_src,
830                                         unsigned int offset,
831                                         unsigned int len,
832                                         gfp_t gfpmask)
833 {
834         struct bio_vec *bv;
835         unsigned int resid;
836         unsigned short idx;
837         unsigned int voff;
838         unsigned short end_idx;
839         unsigned short vcnt;
840         struct bio *bio;
841
842         /* Handle the easy case for the caller */
843
844         if (!offset && len == bio_src->bi_size)
845                 return bio_clone(bio_src, gfpmask);
846
847         if (WARN_ON_ONCE(!len))
848                 return NULL;
849         if (WARN_ON_ONCE(len > bio_src->bi_size))
850                 return NULL;
851         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
852                 return NULL;
853
854         /* Find first affected segment... */
855
856         resid = offset;
857         __bio_for_each_segment(bv, bio_src, idx, 0) {
858                 if (resid < bv->bv_len)
859                         break;
860                 resid -= bv->bv_len;
861         }
862         voff = resid;
863
864         /* ...and the last affected segment */
865
866         resid += len;
867         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
868                 if (resid <= bv->bv_len)
869                         break;
870                 resid -= bv->bv_len;
871         }
872         vcnt = end_idx - idx + 1;
873
874         /* Build the clone */
875
876         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
877         if (!bio)
878                 return NULL;    /* ENOMEM */
879
880         bio->bi_bdev = bio_src->bi_bdev;
881         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
882         bio->bi_rw = bio_src->bi_rw;
883         bio->bi_flags |= 1 << BIO_CLONED;
884
885         /*
886          * Copy over our part of the bio_vec, then update the first
887          * and last (or only) entries.
888          */
889         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
890                         vcnt * sizeof (struct bio_vec));
891         bio->bi_io_vec[0].bv_offset += voff;
892         if (vcnt > 1) {
893                 bio->bi_io_vec[0].bv_len -= voff;
894                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
895         } else {
896                 bio->bi_io_vec[0].bv_len = len;
897         }
898
899         bio->bi_vcnt = vcnt;
900         bio->bi_size = len;
901         bio->bi_idx = 0;
902
903         return bio;
904 }
905
906 /*
907  * Clone a portion of a bio chain, starting at the given byte offset
908  * into the first bio in the source chain and continuing for the
909  * number of bytes indicated.  The result is another bio chain of
910  * exactly the given length, or a null pointer on error.
911  *
912  * The bio_src and offset parameters are both in-out.  On entry they
913  * refer to the first source bio and the offset into that bio where
914  * the start of data to be cloned is located.
915  *
916  * On return, bio_src is updated to refer to the bio in the source
917  * chain that contains first un-cloned byte, and *offset will
918  * contain the offset of that byte within that bio.
919  */
920 static struct bio *bio_chain_clone_range(struct bio **bio_src,
921                                         unsigned int *offset,
922                                         unsigned int len,
923                                         gfp_t gfpmask)
924 {
925         struct bio *bi = *bio_src;
926         unsigned int off = *offset;
927         struct bio *chain = NULL;
928         struct bio **end;
929
930         /* Build up a chain of clone bios up to the limit */
931
932         if (!bi || off >= bi->bi_size || !len)
933                 return NULL;            /* Nothing to clone */
934
935         end = &chain;
936         while (len) {
937                 unsigned int bi_size;
938                 struct bio *bio;
939
940                 if (!bi)
941                         goto out_err;   /* EINVAL; ran out of bio's */
942                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
943                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
944                 if (!bio)
945                         goto out_err;   /* ENOMEM */
946
947                 *end = bio;
948                 end = &bio->bi_next;
949
950                 off += bi_size;
951                 if (off == bi->bi_size) {
952                         bi = bi->bi_next;
953                         off = 0;
954                 }
955                 len -= bi_size;
956         }
957         *bio_src = bi;
958         *offset = off;
959
960         return chain;
961 out_err:
962         bio_chain_put(chain);
963
964         return NULL;
965 }
966
967 /*
968  * helpers for osd request op vectors.
969  */
970 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
971                                         int opcode, u32 payload_len)
972 {
973         struct ceph_osd_req_op *ops;
974
975         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
976         if (!ops)
977                 return NULL;
978
979         ops[0].op = opcode;
980
981         /*
982          * op extent offset and length will be set later on
983          * in calc_raw_layout()
984          */
985         ops[0].payload_len = payload_len;
986
987         return ops;
988 }
989
990 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
991 {
992         kfree(ops);
993 }
994
995 static void rbd_coll_end_req_index(struct request *rq,
996                                    struct rbd_req_coll *coll,
997                                    int index,
998                                    int ret, u64 len)
999 {
1000         struct request_queue *q;
1001         int min, max, i;
1002
1003         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1004              coll, index, ret, (unsigned long long) len);
1005
1006         if (!rq)
1007                 return;
1008
1009         if (!coll) {
1010                 blk_end_request(rq, ret, len);
1011                 return;
1012         }
1013
1014         q = rq->q;
1015
1016         spin_lock_irq(q->queue_lock);
1017         coll->status[index].done = 1;
1018         coll->status[index].rc = ret;
1019         coll->status[index].bytes = len;
1020         max = min = coll->num_done;
1021         while (max < coll->total && coll->status[max].done)
1022                 max++;
1023
1024         for (i = min; i<max; i++) {
1025                 __blk_end_request(rq, coll->status[i].rc,
1026                                   coll->status[i].bytes);
1027                 coll->num_done++;
1028                 kref_put(&coll->kref, rbd_coll_release);
1029         }
1030         spin_unlock_irq(q->queue_lock);
1031 }
1032
1033 static void rbd_coll_end_req(struct rbd_request *req,
1034                              int ret, u64 len)
1035 {
1036         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1037 }
1038
1039 /*
1040  * Send ceph osd request
1041  */
1042 static int rbd_do_request(struct request *rq,
1043                           struct rbd_device *rbd_dev,
1044                           struct ceph_snap_context *snapc,
1045                           u64 snapid,
1046                           const char *object_name, u64 ofs, u64 len,
1047                           struct bio *bio,
1048                           struct page **pages,
1049                           int num_pages,
1050                           int flags,
1051                           struct ceph_osd_req_op *ops,
1052                           struct rbd_req_coll *coll,
1053                           int coll_index,
1054                           void (*rbd_cb)(struct ceph_osd_request *req,
1055                                          struct ceph_msg *msg),
1056                           struct ceph_osd_request **linger_req,
1057                           u64 *ver)
1058 {
1059         struct ceph_osd_request *req;
1060         struct ceph_file_layout *layout;
1061         int ret;
1062         u64 bno;
1063         struct timespec mtime = CURRENT_TIME;
1064         struct rbd_request *req_data;
1065         struct ceph_osd_request_head *reqhead;
1066         struct ceph_osd_client *osdc;
1067
1068         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1069         if (!req_data) {
1070                 if (coll)
1071                         rbd_coll_end_req_index(rq, coll, coll_index,
1072                                                -ENOMEM, len);
1073                 return -ENOMEM;
1074         }
1075
1076         if (coll) {
1077                 req_data->coll = coll;
1078                 req_data->coll_index = coll_index;
1079         }
1080
1081         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1082                 object_name, (unsigned long long) ofs,
1083                 (unsigned long long) len, coll, coll_index);
1084
1085         osdc = &rbd_dev->rbd_client->client->osdc;
1086         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1087                                         false, GFP_NOIO, pages, bio);
1088         if (!req) {
1089                 ret = -ENOMEM;
1090                 goto done_pages;
1091         }
1092
1093         req->r_callback = rbd_cb;
1094
1095         req_data->rq = rq;
1096         req_data->bio = bio;
1097         req_data->pages = pages;
1098         req_data->len = len;
1099
1100         req->r_priv = req_data;
1101
1102         reqhead = req->r_request->front.iov_base;
1103         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1104
1105         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1106         req->r_oid_len = strlen(req->r_oid);
1107
1108         layout = &req->r_file_layout;
1109         memset(layout, 0, sizeof(*layout));
1110         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1111         layout->fl_stripe_count = cpu_to_le32(1);
1112         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1113         layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1114         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1115                                    req, ops);
1116         rbd_assert(ret == 0);
1117
1118         ceph_osdc_build_request(req, ofs, &len,
1119                                 ops,
1120                                 snapc,
1121                                 &mtime,
1122                                 req->r_oid, req->r_oid_len);
1123
1124         if (linger_req) {
1125                 ceph_osdc_set_request_linger(osdc, req);
1126                 *linger_req = req;
1127         }
1128
1129         ret = ceph_osdc_start_request(osdc, req, false);
1130         if (ret < 0)
1131                 goto done_err;
1132
1133         if (!rbd_cb) {
1134                 ret = ceph_osdc_wait_request(osdc, req);
1135                 if (ver)
1136                         *ver = le64_to_cpu(req->r_reassert_version.version);
1137                 dout("reassert_ver=%llu\n",
1138                         (unsigned long long)
1139                                 le64_to_cpu(req->r_reassert_version.version));
1140                 ceph_osdc_put_request(req);
1141         }
1142         return ret;
1143
1144 done_err:
1145         bio_chain_put(req_data->bio);
1146         ceph_osdc_put_request(req);
1147 done_pages:
1148         rbd_coll_end_req(req_data, ret, len);
1149         kfree(req_data);
1150         return ret;
1151 }
1152
1153 /*
1154  * Ceph osd op callback
1155  */
1156 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1157 {
1158         struct rbd_request *req_data = req->r_priv;
1159         struct ceph_osd_reply_head *replyhead;
1160         struct ceph_osd_op *op;
1161         __s32 rc;
1162         u64 bytes;
1163         int read_op;
1164
1165         /* parse reply */
1166         replyhead = msg->front.iov_base;
1167         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1168         op = (void *)(replyhead + 1);
1169         rc = le32_to_cpu(replyhead->result);
1170         bytes = le64_to_cpu(op->extent.length);
1171         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1172
1173         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1174                 (unsigned long long) bytes, read_op, (int) rc);
1175
1176         if (rc == -ENOENT && read_op) {
1177                 zero_bio_chain(req_data->bio, 0);
1178                 rc = 0;
1179         } else if (rc == 0 && read_op && bytes < req_data->len) {
1180                 zero_bio_chain(req_data->bio, bytes);
1181                 bytes = req_data->len;
1182         }
1183
1184         rbd_coll_end_req(req_data, rc, bytes);
1185
1186         if (req_data->bio)
1187                 bio_chain_put(req_data->bio);
1188
1189         ceph_osdc_put_request(req);
1190         kfree(req_data);
1191 }
1192
1193 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1194 {
1195         ceph_osdc_put_request(req);
1196 }
1197
1198 /*
1199  * Do a synchronous ceph osd operation
1200  */
1201 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1202                            struct ceph_snap_context *snapc,
1203                            u64 snapid,
1204                            int flags,
1205                            struct ceph_osd_req_op *ops,
1206                            const char *object_name,
1207                            u64 ofs, u64 inbound_size,
1208                            char *inbound,
1209                            struct ceph_osd_request **linger_req,
1210                            u64 *ver)
1211 {
1212         int ret;
1213         struct page **pages;
1214         int num_pages;
1215
1216         rbd_assert(ops != NULL);
1217
1218         num_pages = calc_pages_for(ofs, inbound_size);
1219         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1220         if (IS_ERR(pages))
1221                 return PTR_ERR(pages);
1222
1223         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1224                           object_name, ofs, inbound_size, NULL,
1225                           pages, num_pages,
1226                           flags,
1227                           ops,
1228                           NULL, 0,
1229                           NULL,
1230                           linger_req, ver);
1231         if (ret < 0)
1232                 goto done;
1233
1234         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1235                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1236
1237 done:
1238         ceph_release_page_vector(pages, num_pages);
1239         return ret;
1240 }
1241
1242 /*
1243  * Do an asynchronous ceph osd operation
1244  */
1245 static int rbd_do_op(struct request *rq,
1246                      struct rbd_device *rbd_dev,
1247                      struct ceph_snap_context *snapc,
1248                      u64 ofs, u64 len,
1249                      struct bio *bio,
1250                      struct rbd_req_coll *coll,
1251                      int coll_index)
1252 {
1253         char *seg_name;
1254         u64 seg_ofs;
1255         u64 seg_len;
1256         int ret;
1257         struct ceph_osd_req_op *ops;
1258         u32 payload_len;
1259         int opcode;
1260         int flags;
1261         u64 snapid;
1262
1263         seg_name = rbd_segment_name(rbd_dev, ofs);
1264         if (!seg_name)
1265                 return -ENOMEM;
1266         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1267         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1268
1269         if (rq_data_dir(rq) == WRITE) {
1270                 opcode = CEPH_OSD_OP_WRITE;
1271                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1272                 snapid = CEPH_NOSNAP;
1273                 payload_len = seg_len;
1274         } else {
1275                 opcode = CEPH_OSD_OP_READ;
1276                 flags = CEPH_OSD_FLAG_READ;
1277                 snapc = NULL;
1278                 snapid = rbd_dev->spec->snap_id;
1279                 payload_len = 0;
1280         }
1281
1282         ret = -ENOMEM;
1283         ops = rbd_create_rw_ops(1, opcode, payload_len);
1284         if (!ops)
1285                 goto done;
1286
1287         /* we've taken care of segment sizes earlier when we
1288            cloned the bios. We should never have a segment
1289            truncated at this point */
1290         rbd_assert(seg_len == len);
1291
1292         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1293                              seg_name, seg_ofs, seg_len,
1294                              bio,
1295                              NULL, 0,
1296                              flags,
1297                              ops,
1298                              coll, coll_index,
1299                              rbd_req_cb, 0, NULL);
1300
1301         rbd_destroy_ops(ops);
1302 done:
1303         kfree(seg_name);
1304         return ret;
1305 }
1306
1307 /*
1308  * Request sync osd read
1309  */
1310 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1311                           u64 snapid,
1312                           const char *object_name,
1313                           u64 ofs, u64 len,
1314                           char *buf,
1315                           u64 *ver)
1316 {
1317         struct ceph_osd_req_op *ops;
1318         int ret;
1319
1320         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1321         if (!ops)
1322                 return -ENOMEM;
1323
1324         ret = rbd_req_sync_op(rbd_dev, NULL,
1325                                snapid,
1326                                CEPH_OSD_FLAG_READ,
1327                                ops, object_name, ofs, len, buf, NULL, ver);
1328         rbd_destroy_ops(ops);
1329
1330         return ret;
1331 }
1332
1333 /*
1334  * Request sync osd watch
1335  */
1336 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1337                                    u64 ver,
1338                                    u64 notify_id)
1339 {
1340         struct ceph_osd_req_op *ops;
1341         int ret;
1342
1343         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1344         if (!ops)
1345                 return -ENOMEM;
1346
1347         ops[0].watch.ver = cpu_to_le64(ver);
1348         ops[0].watch.cookie = notify_id;
1349         ops[0].watch.flag = 0;
1350
1351         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1352                           rbd_dev->header_name, 0, 0, NULL,
1353                           NULL, 0,
1354                           CEPH_OSD_FLAG_READ,
1355                           ops,
1356                           NULL, 0,
1357                           rbd_simple_req_cb, 0, NULL);
1358
1359         rbd_destroy_ops(ops);
1360         return ret;
1361 }
1362
1363 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1364 {
1365         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1366         u64 hver;
1367         int rc;
1368
1369         if (!rbd_dev)
1370                 return;
1371
1372         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1373                 rbd_dev->header_name, (unsigned long long) notify_id,
1374                 (unsigned int) opcode);
1375         rc = rbd_dev_refresh(rbd_dev, &hver);
1376         if (rc)
1377                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1378                            " update snaps: %d\n", rbd_dev->major, rc);
1379
1380         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1381 }
1382
1383 /*
1384  * Request sync osd watch
1385  */
1386 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1387 {
1388         struct ceph_osd_req_op *ops;
1389         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1390         int ret;
1391
1392         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1393         if (!ops)
1394                 return -ENOMEM;
1395
1396         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1397                                      (void *)rbd_dev, &rbd_dev->watch_event);
1398         if (ret < 0)
1399                 goto fail;
1400
1401         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1402         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1403         ops[0].watch.flag = 1;
1404
1405         ret = rbd_req_sync_op(rbd_dev, NULL,
1406                               CEPH_NOSNAP,
1407                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1408                               ops,
1409                               rbd_dev->header_name,
1410                               0, 0, NULL,
1411                               &rbd_dev->watch_request, NULL);
1412
1413         if (ret < 0)
1414                 goto fail_event;
1415
1416         rbd_destroy_ops(ops);
1417         return 0;
1418
1419 fail_event:
1420         ceph_osdc_cancel_event(rbd_dev->watch_event);
1421         rbd_dev->watch_event = NULL;
1422 fail:
1423         rbd_destroy_ops(ops);
1424         return ret;
1425 }
1426
1427 /*
1428  * Request sync osd unwatch
1429  */
1430 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1431 {
1432         struct ceph_osd_req_op *ops;
1433         int ret;
1434
1435         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1436         if (!ops)
1437                 return -ENOMEM;
1438
1439         ops[0].watch.ver = 0;
1440         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1441         ops[0].watch.flag = 0;
1442
1443         ret = rbd_req_sync_op(rbd_dev, NULL,
1444                               CEPH_NOSNAP,
1445                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1446                               ops,
1447                               rbd_dev->header_name,
1448                               0, 0, NULL, NULL, NULL);
1449
1450
1451         rbd_destroy_ops(ops);
1452         ceph_osdc_cancel_event(rbd_dev->watch_event);
1453         rbd_dev->watch_event = NULL;
1454         return ret;
1455 }
1456
1457 /*
1458  * Synchronous osd object method call
1459  */
1460 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1461                              const char *object_name,
1462                              const char *class_name,
1463                              const char *method_name,
1464                              const char *outbound,
1465                              size_t outbound_size,
1466                              char *inbound,
1467                              size_t inbound_size,
1468                              int flags,
1469                              u64 *ver)
1470 {
1471         struct ceph_osd_req_op *ops;
1472         int class_name_len = strlen(class_name);
1473         int method_name_len = strlen(method_name);
1474         int payload_size;
1475         int ret;
1476
1477         /*
1478          * Any input parameters required by the method we're calling
1479          * will be sent along with the class and method names as
1480          * part of the message payload.  That data and its size are
1481          * supplied via the indata and indata_len fields (named from
1482          * the perspective of the server side) in the OSD request
1483          * operation.
1484          */
1485         payload_size = class_name_len + method_name_len + outbound_size;
1486         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1487         if (!ops)
1488                 return -ENOMEM;
1489
1490         ops[0].cls.class_name = class_name;
1491         ops[0].cls.class_len = (__u8) class_name_len;
1492         ops[0].cls.method_name = method_name;
1493         ops[0].cls.method_len = (__u8) method_name_len;
1494         ops[0].cls.argc = 0;
1495         ops[0].cls.indata = outbound;
1496         ops[0].cls.indata_len = outbound_size;
1497
1498         ret = rbd_req_sync_op(rbd_dev, NULL,
1499                                CEPH_NOSNAP,
1500                                flags, ops,
1501                                object_name, 0, inbound_size, inbound,
1502                                NULL, ver);
1503
1504         rbd_destroy_ops(ops);
1505
1506         dout("cls_exec returned %d\n", ret);
1507         return ret;
1508 }
1509
1510 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1511 {
1512         struct rbd_req_coll *coll =
1513                         kzalloc(sizeof(struct rbd_req_coll) +
1514                                 sizeof(struct rbd_req_status) * num_reqs,
1515                                 GFP_ATOMIC);
1516
1517         if (!coll)
1518                 return NULL;
1519         coll->total = num_reqs;
1520         kref_init(&coll->kref);
1521         return coll;
1522 }
1523
1524 /*
1525  * block device queue callback
1526  */
1527 static void rbd_rq_fn(struct request_queue *q)
1528 {
1529         struct rbd_device *rbd_dev = q->queuedata;
1530         struct request *rq;
1531
1532         while ((rq = blk_fetch_request(q))) {
1533                 struct bio *bio;
1534                 bool do_write;
1535                 unsigned int size;
1536                 u64 ofs;
1537                 int num_segs, cur_seg = 0;
1538                 struct rbd_req_coll *coll;
1539                 struct ceph_snap_context *snapc;
1540                 unsigned int bio_offset;
1541
1542                 dout("fetched request\n");
1543
1544                 /* filter out block requests we don't understand */
1545                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1546                         __blk_end_request_all(rq, 0);
1547                         continue;
1548                 }
1549
1550                 /* deduce our operation (read, write) */
1551                 do_write = (rq_data_dir(rq) == WRITE);
1552                 if (do_write && rbd_dev->mapping.read_only) {
1553                         __blk_end_request_all(rq, -EROFS);
1554                         continue;
1555                 }
1556
1557                 spin_unlock_irq(q->queue_lock);
1558
1559                 down_read(&rbd_dev->header_rwsem);
1560
1561                 if (!rbd_dev->exists) {
1562                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1563                         up_read(&rbd_dev->header_rwsem);
1564                         dout("request for non-existent snapshot");
1565                         spin_lock_irq(q->queue_lock);
1566                         __blk_end_request_all(rq, -ENXIO);
1567                         continue;
1568                 }
1569
1570                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1571
1572                 up_read(&rbd_dev->header_rwsem);
1573
1574                 size = blk_rq_bytes(rq);
1575                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1576                 bio = rq->bio;
1577
1578                 dout("%s 0x%x bytes at 0x%llx\n",
1579                      do_write ? "write" : "read",
1580                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1581
1582                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1583                 if (num_segs <= 0) {
1584                         spin_lock_irq(q->queue_lock);
1585                         __blk_end_request_all(rq, num_segs);
1586                         ceph_put_snap_context(snapc);
1587                         continue;
1588                 }
1589                 coll = rbd_alloc_coll(num_segs);
1590                 if (!coll) {
1591                         spin_lock_irq(q->queue_lock);
1592                         __blk_end_request_all(rq, -ENOMEM);
1593                         ceph_put_snap_context(snapc);
1594                         continue;
1595                 }
1596
1597                 bio_offset = 0;
1598                 do {
1599                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1600                         unsigned int chain_size;
1601                         struct bio *bio_chain;
1602
1603                         BUG_ON(limit > (u64) UINT_MAX);
1604                         chain_size = (unsigned int) limit;
1605                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1606
1607                         kref_get(&coll->kref);
1608
1609                         /* Pass a cloned bio chain via an osd request */
1610
1611                         bio_chain = bio_chain_clone_range(&bio,
1612                                                 &bio_offset, chain_size,
1613                                                 GFP_ATOMIC);
1614                         if (bio_chain)
1615                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1616                                                 ofs, chain_size,
1617                                                 bio_chain, coll, cur_seg);
1618                         else
1619                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1620                                                        -ENOMEM, chain_size);
1621                         size -= chain_size;
1622                         ofs += chain_size;
1623
1624                         cur_seg++;
1625                 } while (size > 0);
1626                 kref_put(&coll->kref, rbd_coll_release);
1627
1628                 spin_lock_irq(q->queue_lock);
1629
1630                 ceph_put_snap_context(snapc);
1631         }
1632 }
1633
1634 /*
1635  * a queue callback. Makes sure that we don't create a bio that spans across
1636  * multiple osd objects. One exception would be with a single page bios,
1637  * which we handle later at bio_chain_clone_range()
1638  */
1639 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1640                           struct bio_vec *bvec)
1641 {
1642         struct rbd_device *rbd_dev = q->queuedata;
1643         sector_t sector_offset;
1644         sector_t sectors_per_obj;
1645         sector_t obj_sector_offset;
1646         int ret;
1647
1648         /*
1649          * Find how far into its rbd object the partition-relative
1650          * bio start sector is to offset relative to the enclosing
1651          * device.
1652          */
1653         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1654         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1655         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1656
1657         /*
1658          * Compute the number of bytes from that offset to the end
1659          * of the object.  Account for what's already used by the bio.
1660          */
1661         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1662         if (ret > bmd->bi_size)
1663                 ret -= bmd->bi_size;
1664         else
1665                 ret = 0;
1666
1667         /*
1668          * Don't send back more than was asked for.  And if the bio
1669          * was empty, let the whole thing through because:  "Note
1670          * that a block device *must* allow a single page to be
1671          * added to an empty bio."
1672          */
1673         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1674         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1675                 ret = (int) bvec->bv_len;
1676
1677         return ret;
1678 }
1679
1680 static void rbd_free_disk(struct rbd_device *rbd_dev)
1681 {
1682         struct gendisk *disk = rbd_dev->disk;
1683
1684         if (!disk)
1685                 return;
1686
1687         if (disk->flags & GENHD_FL_UP)
1688                 del_gendisk(disk);
1689         if (disk->queue)
1690                 blk_cleanup_queue(disk->queue);
1691         put_disk(disk);
1692 }
1693
1694 /*
1695  * Read the complete header for the given rbd device.
1696  *
1697  * Returns a pointer to a dynamically-allocated buffer containing
1698  * the complete and validated header.  Caller can pass the address
1699  * of a variable that will be filled in with the version of the
1700  * header object at the time it was read.
1701  *
1702  * Returns a pointer-coded errno if a failure occurs.
1703  */
1704 static struct rbd_image_header_ondisk *
1705 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1706 {
1707         struct rbd_image_header_ondisk *ondisk = NULL;
1708         u32 snap_count = 0;
1709         u64 names_size = 0;
1710         u32 want_count;
1711         int ret;
1712
1713         /*
1714          * The complete header will include an array of its 64-bit
1715          * snapshot ids, followed by the names of those snapshots as
1716          * a contiguous block of NUL-terminated strings.  Note that
1717          * the number of snapshots could change by the time we read
1718          * it in, in which case we re-read it.
1719          */
1720         do {
1721                 size_t size;
1722
1723                 kfree(ondisk);
1724
1725                 size = sizeof (*ondisk);
1726                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1727                 size += names_size;
1728                 ondisk = kmalloc(size, GFP_KERNEL);
1729                 if (!ondisk)
1730                         return ERR_PTR(-ENOMEM);
1731
1732                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1733                                        rbd_dev->header_name,
1734                                        0, size,
1735                                        (char *) ondisk, version);
1736
1737                 if (ret < 0)
1738                         goto out_err;
1739                 if (WARN_ON((size_t) ret < size)) {
1740                         ret = -ENXIO;
1741                         pr_warning("short header read for image %s"
1742                                         " (want %zd got %d)\n",
1743                                 rbd_dev->spec->image_name, size, ret);
1744                         goto out_err;
1745                 }
1746                 if (!rbd_dev_ondisk_valid(ondisk)) {
1747                         ret = -ENXIO;
1748                         pr_warning("invalid header for image %s\n",
1749                                 rbd_dev->spec->image_name);
1750                         goto out_err;
1751                 }
1752
1753                 names_size = le64_to_cpu(ondisk->snap_names_len);
1754                 want_count = snap_count;
1755                 snap_count = le32_to_cpu(ondisk->snap_count);
1756         } while (snap_count != want_count);
1757
1758         return ondisk;
1759
1760 out_err:
1761         kfree(ondisk);
1762
1763         return ERR_PTR(ret);
1764 }
1765
1766 /*
1767  * reload the ondisk the header
1768  */
1769 static int rbd_read_header(struct rbd_device *rbd_dev,
1770                            struct rbd_image_header *header)
1771 {
1772         struct rbd_image_header_ondisk *ondisk;
1773         u64 ver = 0;
1774         int ret;
1775
1776         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1777         if (IS_ERR(ondisk))
1778                 return PTR_ERR(ondisk);
1779         ret = rbd_header_from_disk(header, ondisk);
1780         if (ret >= 0)
1781                 header->obj_version = ver;
1782         kfree(ondisk);
1783
1784         return ret;
1785 }
1786
1787 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1788 {
1789         struct rbd_snap *snap;
1790         struct rbd_snap *next;
1791
1792         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1793                 rbd_remove_snap_dev(snap);
1794 }
1795
1796 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1797 {
1798         sector_t size;
1799
1800         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1801                 return;
1802
1803         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1804         dout("setting size to %llu sectors", (unsigned long long) size);
1805         rbd_dev->mapping.size = (u64) size;
1806         set_capacity(rbd_dev->disk, size);
1807 }
1808
1809 /*
1810  * only read the first part of the ondisk header, without the snaps info
1811  */
1812 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1813 {
1814         int ret;
1815         struct rbd_image_header h;
1816
1817         ret = rbd_read_header(rbd_dev, &h);
1818         if (ret < 0)
1819                 return ret;
1820
1821         down_write(&rbd_dev->header_rwsem);
1822
1823         /* Update image size, and check for resize of mapped image */
1824         rbd_dev->header.image_size = h.image_size;
1825         rbd_update_mapping_size(rbd_dev);
1826
1827         /* rbd_dev->header.object_prefix shouldn't change */
1828         kfree(rbd_dev->header.snap_sizes);
1829         kfree(rbd_dev->header.snap_names);
1830         /* osd requests may still refer to snapc */
1831         ceph_put_snap_context(rbd_dev->header.snapc);
1832
1833         if (hver)
1834                 *hver = h.obj_version;
1835         rbd_dev->header.obj_version = h.obj_version;
1836         rbd_dev->header.image_size = h.image_size;
1837         rbd_dev->header.snapc = h.snapc;
1838         rbd_dev->header.snap_names = h.snap_names;
1839         rbd_dev->header.snap_sizes = h.snap_sizes;
1840         /* Free the extra copy of the object prefix */
1841         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1842         kfree(h.object_prefix);
1843
1844         ret = rbd_dev_snaps_update(rbd_dev);
1845         if (!ret)
1846                 ret = rbd_dev_snaps_register(rbd_dev);
1847
1848         up_write(&rbd_dev->header_rwsem);
1849
1850         return ret;
1851 }
1852
1853 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1854 {
1855         int ret;
1856
1857         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1858         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1859         if (rbd_dev->image_format == 1)
1860                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1861         else
1862                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1863         mutex_unlock(&ctl_mutex);
1864
1865         return ret;
1866 }
1867
1868 static int rbd_init_disk(struct rbd_device *rbd_dev)
1869 {
1870         struct gendisk *disk;
1871         struct request_queue *q;
1872         u64 segment_size;
1873
1874         /* create gendisk info */
1875         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1876         if (!disk)
1877                 return -ENOMEM;
1878
1879         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1880                  rbd_dev->dev_id);
1881         disk->major = rbd_dev->major;
1882         disk->first_minor = 0;
1883         disk->fops = &rbd_bd_ops;
1884         disk->private_data = rbd_dev;
1885
1886         /* init rq */
1887         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1888         if (!q)
1889                 goto out_disk;
1890
1891         /* We use the default size, but let's be explicit about it. */
1892         blk_queue_physical_block_size(q, SECTOR_SIZE);
1893
1894         /* set io sizes to object size */
1895         segment_size = rbd_obj_bytes(&rbd_dev->header);
1896         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1897         blk_queue_max_segment_size(q, segment_size);
1898         blk_queue_io_min(q, segment_size);
1899         blk_queue_io_opt(q, segment_size);
1900
1901         blk_queue_merge_bvec(q, rbd_merge_bvec);
1902         disk->queue = q;
1903
1904         q->queuedata = rbd_dev;
1905
1906         rbd_dev->disk = disk;
1907
1908         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1909
1910         return 0;
1911 out_disk:
1912         put_disk(disk);
1913
1914         return -ENOMEM;
1915 }
1916
1917 /*
1918   sysfs
1919 */
1920
1921 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1922 {
1923         return container_of(dev, struct rbd_device, dev);
1924 }
1925
1926 static ssize_t rbd_size_show(struct device *dev,
1927                              struct device_attribute *attr, char *buf)
1928 {
1929         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1930         sector_t size;
1931
1932         down_read(&rbd_dev->header_rwsem);
1933         size = get_capacity(rbd_dev->disk);
1934         up_read(&rbd_dev->header_rwsem);
1935
1936         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1937 }
1938
1939 /*
1940  * Note this shows the features for whatever's mapped, which is not
1941  * necessarily the base image.
1942  */
1943 static ssize_t rbd_features_show(struct device *dev,
1944                              struct device_attribute *attr, char *buf)
1945 {
1946         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1947
1948         return sprintf(buf, "0x%016llx\n",
1949                         (unsigned long long) rbd_dev->mapping.features);
1950 }
1951
1952 static ssize_t rbd_major_show(struct device *dev,
1953                               struct device_attribute *attr, char *buf)
1954 {
1955         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1956
1957         return sprintf(buf, "%d\n", rbd_dev->major);
1958 }
1959
1960 static ssize_t rbd_client_id_show(struct device *dev,
1961                                   struct device_attribute *attr, char *buf)
1962 {
1963         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1964
1965         return sprintf(buf, "client%lld\n",
1966                         ceph_client_id(rbd_dev->rbd_client->client));
1967 }
1968
1969 static ssize_t rbd_pool_show(struct device *dev,
1970                              struct device_attribute *attr, char *buf)
1971 {
1972         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1973
1974         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1975 }
1976
1977 static ssize_t rbd_pool_id_show(struct device *dev,
1978                              struct device_attribute *attr, char *buf)
1979 {
1980         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1981
1982         return sprintf(buf, "%llu\n",
1983                 (unsigned long long) rbd_dev->spec->pool_id);
1984 }
1985
1986 static ssize_t rbd_name_show(struct device *dev,
1987                              struct device_attribute *attr, char *buf)
1988 {
1989         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1990
1991         return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
1992 }
1993
1994 static ssize_t rbd_image_id_show(struct device *dev,
1995                              struct device_attribute *attr, char *buf)
1996 {
1997         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1998
1999         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2000 }
2001
2002 /*
2003  * Shows the name of the currently-mapped snapshot (or
2004  * RBD_SNAP_HEAD_NAME for the base image).
2005  */
2006 static ssize_t rbd_snap_show(struct device *dev,
2007                              struct device_attribute *attr,
2008                              char *buf)
2009 {
2010         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2011
2012         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2013 }
2014
2015 static ssize_t rbd_image_refresh(struct device *dev,
2016                                  struct device_attribute *attr,
2017                                  const char *buf,
2018                                  size_t size)
2019 {
2020         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2021         int ret;
2022
2023         ret = rbd_dev_refresh(rbd_dev, NULL);
2024
2025         return ret < 0 ? ret : size;
2026 }
2027
2028 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2029 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2030 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2031 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2032 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2033 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2034 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2035 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2036 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2037 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2038
2039 static struct attribute *rbd_attrs[] = {
2040         &dev_attr_size.attr,
2041         &dev_attr_features.attr,
2042         &dev_attr_major.attr,
2043         &dev_attr_client_id.attr,
2044         &dev_attr_pool.attr,
2045         &dev_attr_pool_id.attr,
2046         &dev_attr_name.attr,
2047         &dev_attr_image_id.attr,
2048         &dev_attr_current_snap.attr,
2049         &dev_attr_refresh.attr,
2050         NULL
2051 };
2052
2053 static struct attribute_group rbd_attr_group = {
2054         .attrs = rbd_attrs,
2055 };
2056
2057 static const struct attribute_group *rbd_attr_groups[] = {
2058         &rbd_attr_group,
2059         NULL
2060 };
2061
2062 static void rbd_sysfs_dev_release(struct device *dev)
2063 {
2064 }
2065
2066 static struct device_type rbd_device_type = {
2067         .name           = "rbd",
2068         .groups         = rbd_attr_groups,
2069         .release        = rbd_sysfs_dev_release,
2070 };
2071
2072
2073 /*
2074   sysfs - snapshots
2075 */
2076
2077 static ssize_t rbd_snap_size_show(struct device *dev,
2078                                   struct device_attribute *attr,
2079                                   char *buf)
2080 {
2081         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2082
2083         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2084 }
2085
2086 static ssize_t rbd_snap_id_show(struct device *dev,
2087                                 struct device_attribute *attr,
2088                                 char *buf)
2089 {
2090         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2091
2092         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2093 }
2094
2095 static ssize_t rbd_snap_features_show(struct device *dev,
2096                                 struct device_attribute *attr,
2097                                 char *buf)
2098 {
2099         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2100
2101         return sprintf(buf, "0x%016llx\n",
2102                         (unsigned long long) snap->features);
2103 }
2104
2105 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2106 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2107 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2108
2109 static struct attribute *rbd_snap_attrs[] = {
2110         &dev_attr_snap_size.attr,
2111         &dev_attr_snap_id.attr,
2112         &dev_attr_snap_features.attr,
2113         NULL,
2114 };
2115
2116 static struct attribute_group rbd_snap_attr_group = {
2117         .attrs = rbd_snap_attrs,
2118 };
2119
2120 static void rbd_snap_dev_release(struct device *dev)
2121 {
2122         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2123         kfree(snap->name);
2124         kfree(snap);
2125 }
2126
2127 static const struct attribute_group *rbd_snap_attr_groups[] = {
2128         &rbd_snap_attr_group,
2129         NULL
2130 };
2131
2132 static struct device_type rbd_snap_device_type = {
2133         .groups         = rbd_snap_attr_groups,
2134         .release        = rbd_snap_dev_release,
2135 };
2136
2137 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2138 {
2139         kref_get(&spec->kref);
2140
2141         return spec;
2142 }
2143
2144 static void rbd_spec_free(struct kref *kref);
2145 static void rbd_spec_put(struct rbd_spec *spec)
2146 {
2147         if (spec)
2148                 kref_put(&spec->kref, rbd_spec_free);
2149 }
2150
2151 static struct rbd_spec *rbd_spec_alloc(void)
2152 {
2153         struct rbd_spec *spec;
2154
2155         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2156         if (!spec)
2157                 return NULL;
2158         kref_init(&spec->kref);
2159
2160         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2161
2162         return spec;
2163 }
2164
2165 static void rbd_spec_free(struct kref *kref)
2166 {
2167         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2168
2169         kfree(spec->pool_name);
2170         kfree(spec->image_id);
2171         kfree(spec->image_name);
2172         kfree(spec->snap_name);
2173         kfree(spec);
2174 }
2175
2176 static bool rbd_snap_registered(struct rbd_snap *snap)
2177 {
2178         bool ret = snap->dev.type == &rbd_snap_device_type;
2179         bool reg = device_is_registered(&snap->dev);
2180
2181         rbd_assert(!ret ^ reg);
2182
2183         return ret;
2184 }
2185
2186 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2187 {
2188         list_del(&snap->node);
2189         if (device_is_registered(&snap->dev))
2190                 device_unregister(&snap->dev);
2191 }
2192
2193 static int rbd_register_snap_dev(struct rbd_snap *snap,
2194                                   struct device *parent)
2195 {
2196         struct device *dev = &snap->dev;
2197         int ret;
2198
2199         dev->type = &rbd_snap_device_type;
2200         dev->parent = parent;
2201         dev->release = rbd_snap_dev_release;
2202         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2203         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2204
2205         ret = device_register(dev);
2206
2207         return ret;
2208 }
2209
2210 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2211                                                 const char *snap_name,
2212                                                 u64 snap_id, u64 snap_size,
2213                                                 u64 snap_features)
2214 {
2215         struct rbd_snap *snap;
2216         int ret;
2217
2218         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2219         if (!snap)
2220                 return ERR_PTR(-ENOMEM);
2221
2222         ret = -ENOMEM;
2223         snap->name = kstrdup(snap_name, GFP_KERNEL);
2224         if (!snap->name)
2225                 goto err;
2226
2227         snap->id = snap_id;
2228         snap->size = snap_size;
2229         snap->features = snap_features;
2230
2231         return snap;
2232
2233 err:
2234         kfree(snap->name);
2235         kfree(snap);
2236
2237         return ERR_PTR(ret);
2238 }
2239
2240 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2241                 u64 *snap_size, u64 *snap_features)
2242 {
2243         char *snap_name;
2244
2245         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2246
2247         *snap_size = rbd_dev->header.snap_sizes[which];
2248         *snap_features = 0;     /* No features for v1 */
2249
2250         /* Skip over names until we find the one we are looking for */
2251
2252         snap_name = rbd_dev->header.snap_names;
2253         while (which--)
2254                 snap_name += strlen(snap_name) + 1;
2255
2256         return snap_name;
2257 }
2258
2259 /*
2260  * Get the size and object order for an image snapshot, or if
2261  * snap_id is CEPH_NOSNAP, gets this information for the base
2262  * image.
2263  */
2264 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2265                                 u8 *order, u64 *snap_size)
2266 {
2267         __le64 snapid = cpu_to_le64(snap_id);
2268         int ret;
2269         struct {
2270                 u8 order;
2271                 __le64 size;
2272         } __attribute__ ((packed)) size_buf = { 0 };
2273
2274         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2275                                 "rbd", "get_size",
2276                                 (char *) &snapid, sizeof (snapid),
2277                                 (char *) &size_buf, sizeof (size_buf),
2278                                 CEPH_OSD_FLAG_READ, NULL);
2279         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2280         if (ret < 0)
2281                 return ret;
2282
2283         *order = size_buf.order;
2284         *snap_size = le64_to_cpu(size_buf.size);
2285
2286         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2287                 (unsigned long long) snap_id, (unsigned int) *order,
2288                 (unsigned long long) *snap_size);
2289
2290         return 0;
2291 }
2292
2293 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2294 {
2295         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2296                                         &rbd_dev->header.obj_order,
2297                                         &rbd_dev->header.image_size);
2298 }
2299
2300 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2301 {
2302         void *reply_buf;
2303         int ret;
2304         void *p;
2305
2306         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2307         if (!reply_buf)
2308                 return -ENOMEM;
2309
2310         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2311                                 "rbd", "get_object_prefix",
2312                                 NULL, 0,
2313                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2314                                 CEPH_OSD_FLAG_READ, NULL);
2315         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2316         if (ret < 0)
2317                 goto out;
2318         ret = 0;    /* rbd_req_sync_exec() can return positive */
2319
2320         p = reply_buf;
2321         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2322                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2323                                                 NULL, GFP_NOIO);
2324
2325         if (IS_ERR(rbd_dev->header.object_prefix)) {
2326                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2327                 rbd_dev->header.object_prefix = NULL;
2328         } else {
2329                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2330         }
2331
2332 out:
2333         kfree(reply_buf);
2334
2335         return ret;
2336 }
2337
2338 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2339                 u64 *snap_features)
2340 {
2341         __le64 snapid = cpu_to_le64(snap_id);
2342         struct {
2343                 __le64 features;
2344                 __le64 incompat;
2345         } features_buf = { 0 };
2346         u64 incompat;
2347         int ret;
2348
2349         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2350                                 "rbd", "get_features",
2351                                 (char *) &snapid, sizeof (snapid),
2352                                 (char *) &features_buf, sizeof (features_buf),
2353                                 CEPH_OSD_FLAG_READ, NULL);
2354         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2355         if (ret < 0)
2356                 return ret;
2357
2358         incompat = le64_to_cpu(features_buf.incompat);
2359         if (incompat & ~RBD_FEATURES_ALL)
2360                 return -ENOTSUPP;
2361
2362         *snap_features = le64_to_cpu(features_buf.features);
2363
2364         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2365                 (unsigned long long) snap_id,
2366                 (unsigned long long) *snap_features,
2367                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2368
2369         return 0;
2370 }
2371
2372 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2373 {
2374         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2375                                                 &rbd_dev->header.features);
2376 }
2377
2378 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2379 {
2380         size_t size;
2381         int ret;
2382         void *reply_buf;
2383         void *p;
2384         void *end;
2385         u64 seq;
2386         u32 snap_count;
2387         struct ceph_snap_context *snapc;
2388         u32 i;
2389
2390         /*
2391          * We'll need room for the seq value (maximum snapshot id),
2392          * snapshot count, and array of that many snapshot ids.
2393          * For now we have a fixed upper limit on the number we're
2394          * prepared to receive.
2395          */
2396         size = sizeof (__le64) + sizeof (__le32) +
2397                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2398         reply_buf = kzalloc(size, GFP_KERNEL);
2399         if (!reply_buf)
2400                 return -ENOMEM;
2401
2402         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2403                                 "rbd", "get_snapcontext",
2404                                 NULL, 0,
2405                                 reply_buf, size,
2406                                 CEPH_OSD_FLAG_READ, ver);
2407         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2408         if (ret < 0)
2409                 goto out;
2410
2411         ret = -ERANGE;
2412         p = reply_buf;
2413         end = (char *) reply_buf + size;
2414         ceph_decode_64_safe(&p, end, seq, out);
2415         ceph_decode_32_safe(&p, end, snap_count, out);
2416
2417         /*
2418          * Make sure the reported number of snapshot ids wouldn't go
2419          * beyond the end of our buffer.  But before checking that,
2420          * make sure the computed size of the snapshot context we
2421          * allocate is representable in a size_t.
2422          */
2423         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2424                                  / sizeof (u64)) {
2425                 ret = -EINVAL;
2426                 goto out;
2427         }
2428         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2429                 goto out;
2430
2431         size = sizeof (struct ceph_snap_context) +
2432                                 snap_count * sizeof (snapc->snaps[0]);
2433         snapc = kmalloc(size, GFP_KERNEL);
2434         if (!snapc) {
2435                 ret = -ENOMEM;
2436                 goto out;
2437         }
2438
2439         atomic_set(&snapc->nref, 1);
2440         snapc->seq = seq;
2441         snapc->num_snaps = snap_count;
2442         for (i = 0; i < snap_count; i++)
2443                 snapc->snaps[i] = ceph_decode_64(&p);
2444
2445         rbd_dev->header.snapc = snapc;
2446
2447         dout("  snap context seq = %llu, snap_count = %u\n",
2448                 (unsigned long long) seq, (unsigned int) snap_count);
2449
2450 out:
2451         kfree(reply_buf);
2452
2453         return 0;
2454 }
2455
2456 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2457 {
2458         size_t size;
2459         void *reply_buf;
2460         __le64 snap_id;
2461         int ret;
2462         void *p;
2463         void *end;
2464         char *snap_name;
2465
2466         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2467         reply_buf = kmalloc(size, GFP_KERNEL);
2468         if (!reply_buf)
2469                 return ERR_PTR(-ENOMEM);
2470
2471         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2472         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2473                                 "rbd", "get_snapshot_name",
2474                                 (char *) &snap_id, sizeof (snap_id),
2475                                 reply_buf, size,
2476                                 CEPH_OSD_FLAG_READ, NULL);
2477         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2478         if (ret < 0)
2479                 goto out;
2480
2481         p = reply_buf;
2482         end = (char *) reply_buf + size;
2483         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2484         if (IS_ERR(snap_name)) {
2485                 ret = PTR_ERR(snap_name);
2486                 goto out;
2487         } else {
2488                 dout("  snap_id 0x%016llx snap_name = %s\n",
2489                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2490         }
2491         kfree(reply_buf);
2492
2493         return snap_name;
2494 out:
2495         kfree(reply_buf);
2496
2497         return ERR_PTR(ret);
2498 }
2499
2500 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2501                 u64 *snap_size, u64 *snap_features)
2502 {
2503         __le64 snap_id;
2504         u8 order;
2505         int ret;
2506
2507         snap_id = rbd_dev->header.snapc->snaps[which];
2508         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2509         if (ret)
2510                 return ERR_PTR(ret);
2511         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2512         if (ret)
2513                 return ERR_PTR(ret);
2514
2515         return rbd_dev_v2_snap_name(rbd_dev, which);
2516 }
2517
2518 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2519                 u64 *snap_size, u64 *snap_features)
2520 {
2521         if (rbd_dev->image_format == 1)
2522                 return rbd_dev_v1_snap_info(rbd_dev, which,
2523                                         snap_size, snap_features);
2524         if (rbd_dev->image_format == 2)
2525                 return rbd_dev_v2_snap_info(rbd_dev, which,
2526                                         snap_size, snap_features);
2527         return ERR_PTR(-EINVAL);
2528 }
2529
2530 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2531 {
2532         int ret;
2533         __u8 obj_order;
2534
2535         down_write(&rbd_dev->header_rwsem);
2536
2537         /* Grab old order first, to see if it changes */
2538
2539         obj_order = rbd_dev->header.obj_order,
2540         ret = rbd_dev_v2_image_size(rbd_dev);
2541         if (ret)
2542                 goto out;
2543         if (rbd_dev->header.obj_order != obj_order) {
2544                 ret = -EIO;
2545                 goto out;
2546         }
2547         rbd_update_mapping_size(rbd_dev);
2548
2549         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2550         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2551         if (ret)
2552                 goto out;
2553         ret = rbd_dev_snaps_update(rbd_dev);
2554         dout("rbd_dev_snaps_update returned %d\n", ret);
2555         if (ret)
2556                 goto out;
2557         ret = rbd_dev_snaps_register(rbd_dev);
2558         dout("rbd_dev_snaps_register returned %d\n", ret);
2559 out:
2560         up_write(&rbd_dev->header_rwsem);
2561
2562         return ret;
2563 }
2564
2565 /*
2566  * Scan the rbd device's current snapshot list and compare it to the
2567  * newly-received snapshot context.  Remove any existing snapshots
2568  * not present in the new snapshot context.  Add a new snapshot for
2569  * any snaphots in the snapshot context not in the current list.
2570  * And verify there are no changes to snapshots we already know
2571  * about.
2572  *
2573  * Assumes the snapshots in the snapshot context are sorted by
2574  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2575  * are also maintained in that order.)
2576  */
2577 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2578 {
2579         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2580         const u32 snap_count = snapc->num_snaps;
2581         struct list_head *head = &rbd_dev->snaps;
2582         struct list_head *links = head->next;
2583         u32 index = 0;
2584
2585         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2586         while (index < snap_count || links != head) {
2587                 u64 snap_id;
2588                 struct rbd_snap *snap;
2589                 char *snap_name;
2590                 u64 snap_size = 0;
2591                 u64 snap_features = 0;
2592
2593                 snap_id = index < snap_count ? snapc->snaps[index]
2594                                              : CEPH_NOSNAP;
2595                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2596                                      : NULL;
2597                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2598
2599                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2600                         struct list_head *next = links->next;
2601
2602                         /* Existing snapshot not in the new snap context */
2603
2604                         if (rbd_dev->spec->snap_id == snap->id)
2605                                 rbd_dev->exists = false;
2606                         rbd_remove_snap_dev(snap);
2607                         dout("%ssnap id %llu has been removed\n",
2608                                 rbd_dev->spec->snap_id == snap->id ?
2609                                                         "mapped " : "",
2610                                 (unsigned long long) snap->id);
2611
2612                         /* Done with this list entry; advance */
2613
2614                         links = next;
2615                         continue;
2616                 }
2617
2618                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2619                                         &snap_size, &snap_features);
2620                 if (IS_ERR(snap_name))
2621                         return PTR_ERR(snap_name);
2622
2623                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2624                         (unsigned long long) snap_id);
2625                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2626                         struct rbd_snap *new_snap;
2627
2628                         /* We haven't seen this snapshot before */
2629
2630                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2631                                         snap_id, snap_size, snap_features);
2632                         if (IS_ERR(new_snap)) {
2633                                 int err = PTR_ERR(new_snap);
2634
2635                                 dout("  failed to add dev, error %d\n", err);
2636
2637                                 return err;
2638                         }
2639
2640                         /* New goes before existing, or at end of list */
2641
2642                         dout("  added dev%s\n", snap ? "" : " at end\n");
2643                         if (snap)
2644                                 list_add_tail(&new_snap->node, &snap->node);
2645                         else
2646                                 list_add_tail(&new_snap->node, head);
2647                 } else {
2648                         /* Already have this one */
2649
2650                         dout("  already present\n");
2651
2652                         rbd_assert(snap->size == snap_size);
2653                         rbd_assert(!strcmp(snap->name, snap_name));
2654                         rbd_assert(snap->features == snap_features);
2655
2656                         /* Done with this list entry; advance */
2657
2658                         links = links->next;
2659                 }
2660
2661                 /* Advance to the next entry in the snapshot context */
2662
2663                 index++;
2664         }
2665         dout("%s: done\n", __func__);
2666
2667         return 0;
2668 }
2669
2670 /*
2671  * Scan the list of snapshots and register the devices for any that
2672  * have not already been registered.
2673  */
2674 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2675 {
2676         struct rbd_snap *snap;
2677         int ret = 0;
2678
2679         dout("%s called\n", __func__);
2680         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2681                 return -EIO;
2682
2683         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2684                 if (!rbd_snap_registered(snap)) {
2685                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2686                         if (ret < 0)
2687                                 break;
2688                 }
2689         }
2690         dout("%s: returning %d\n", __func__, ret);
2691
2692         return ret;
2693 }
2694
2695 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2696 {
2697         struct device *dev;
2698         int ret;
2699
2700         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2701
2702         dev = &rbd_dev->dev;
2703         dev->bus = &rbd_bus_type;
2704         dev->type = &rbd_device_type;
2705         dev->parent = &rbd_root_dev;
2706         dev->release = rbd_dev_release;
2707         dev_set_name(dev, "%d", rbd_dev->dev_id);
2708         ret = device_register(dev);
2709
2710         mutex_unlock(&ctl_mutex);
2711
2712         return ret;
2713 }
2714
2715 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2716 {
2717         device_unregister(&rbd_dev->dev);
2718 }
2719
2720 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2721 {
2722         int ret, rc;
2723
2724         do {
2725                 ret = rbd_req_sync_watch(rbd_dev);
2726                 if (ret == -ERANGE) {
2727                         rc = rbd_dev_refresh(rbd_dev, NULL);
2728                         if (rc < 0)
2729                                 return rc;
2730                 }
2731         } while (ret == -ERANGE);
2732
2733         return ret;
2734 }
2735
2736 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2737
2738 /*
2739  * Get a unique rbd identifier for the given new rbd_dev, and add
2740  * the rbd_dev to the global list.  The minimum rbd id is 1.
2741  */
2742 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2743 {
2744         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2745
2746         spin_lock(&rbd_dev_list_lock);
2747         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2748         spin_unlock(&rbd_dev_list_lock);
2749         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2750                 (unsigned long long) rbd_dev->dev_id);
2751 }
2752
2753 /*
2754  * Remove an rbd_dev from the global list, and record that its
2755  * identifier is no longer in use.
2756  */
2757 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2758 {
2759         struct list_head *tmp;
2760         int rbd_id = rbd_dev->dev_id;
2761         int max_id;
2762
2763         rbd_assert(rbd_id > 0);
2764
2765         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2766                 (unsigned long long) rbd_dev->dev_id);
2767         spin_lock(&rbd_dev_list_lock);
2768         list_del_init(&rbd_dev->node);
2769
2770         /*
2771          * If the id being "put" is not the current maximum, there
2772          * is nothing special we need to do.
2773          */
2774         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2775                 spin_unlock(&rbd_dev_list_lock);
2776                 return;
2777         }
2778
2779         /*
2780          * We need to update the current maximum id.  Search the
2781          * list to find out what it is.  We're more likely to find
2782          * the maximum at the end, so search the list backward.
2783          */
2784         max_id = 0;
2785         list_for_each_prev(tmp, &rbd_dev_list) {
2786                 struct rbd_device *rbd_dev;
2787
2788                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2789                 if (rbd_dev->dev_id > max_id)
2790                         max_id = rbd_dev->dev_id;
2791         }
2792         spin_unlock(&rbd_dev_list_lock);
2793
2794         /*
2795          * The max id could have been updated by rbd_dev_id_get(), in
2796          * which case it now accurately reflects the new maximum.
2797          * Be careful not to overwrite the maximum value in that
2798          * case.
2799          */
2800         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2801         dout("  max dev id has been reset\n");
2802 }
2803
2804 /*
2805  * Skips over white space at *buf, and updates *buf to point to the
2806  * first found non-space character (if any). Returns the length of
2807  * the token (string of non-white space characters) found.  Note
2808  * that *buf must be terminated with '\0'.
2809  */
2810 static inline size_t next_token(const char **buf)
2811 {
2812         /*
2813         * These are the characters that produce nonzero for
2814         * isspace() in the "C" and "POSIX" locales.
2815         */
2816         const char *spaces = " \f\n\r\t\v";
2817
2818         *buf += strspn(*buf, spaces);   /* Find start of token */
2819
2820         return strcspn(*buf, spaces);   /* Return token length */
2821 }
2822
2823 /*
2824  * Finds the next token in *buf, and if the provided token buffer is
2825  * big enough, copies the found token into it.  The result, if
2826  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2827  * must be terminated with '\0' on entry.
2828  *
2829  * Returns the length of the token found (not including the '\0').
2830  * Return value will be 0 if no token is found, and it will be >=
2831  * token_size if the token would not fit.
2832  *
2833  * The *buf pointer will be updated to point beyond the end of the
2834  * found token.  Note that this occurs even if the token buffer is
2835  * too small to hold it.
2836  */
2837 static inline size_t copy_token(const char **buf,
2838                                 char *token,
2839                                 size_t token_size)
2840 {
2841         size_t len;
2842
2843         len = next_token(buf);
2844         if (len < token_size) {
2845                 memcpy(token, *buf, len);
2846                 *(token + len) = '\0';
2847         }
2848         *buf += len;
2849
2850         return len;
2851 }
2852
2853 /*
2854  * Finds the next token in *buf, dynamically allocates a buffer big
2855  * enough to hold a copy of it, and copies the token into the new
2856  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2857  * that a duplicate buffer is created even for a zero-length token.
2858  *
2859  * Returns a pointer to the newly-allocated duplicate, or a null
2860  * pointer if memory for the duplicate was not available.  If
2861  * the lenp argument is a non-null pointer, the length of the token
2862  * (not including the '\0') is returned in *lenp.
2863  *
2864  * If successful, the *buf pointer will be updated to point beyond
2865  * the end of the found token.
2866  *
2867  * Note: uses GFP_KERNEL for allocation.
2868  */
2869 static inline char *dup_token(const char **buf, size_t *lenp)
2870 {
2871         char *dup;
2872         size_t len;
2873
2874         len = next_token(buf);
2875         dup = kmalloc(len + 1, GFP_KERNEL);
2876         if (!dup)
2877                 return NULL;
2878
2879         memcpy(dup, *buf, len);
2880         *(dup + len) = '\0';
2881         *buf += len;
2882
2883         if (lenp)
2884                 *lenp = len;
2885
2886         return dup;
2887 }
2888
2889 /*
2890  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2891  * rbd_md_name, and name fields of the given rbd_dev, based on the
2892  * list of monitor addresses and other options provided via
2893  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2894  * copy of the snapshot name to map if successful, or a
2895  * pointer-coded error otherwise.
2896  *
2897  * Note: rbd_dev is assumed to have been initially zero-filled.
2898  */
2899 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2900                                 const char *buf,
2901                                 struct ceph_options **ceph_opts,
2902                                 struct rbd_options **opts)
2903 {
2904         size_t len;
2905         const char *mon_addrs;
2906         size_t mon_addrs_size;
2907         char *options;
2908         struct rbd_options *rbd_opts = NULL;
2909         int ret;
2910
2911         /* The first four tokens are required */
2912
2913         len = next_token(&buf);
2914         if (!len)
2915                 return -EINVAL; /* Missing monitor address(es) */
2916         mon_addrs = buf;
2917         mon_addrs_size = len + 1;
2918         buf += len;
2919
2920         ret = -EINVAL;
2921         options = dup_token(&buf, NULL);
2922         if (!options)
2923                 return -ENOMEM;
2924         if (!*options)
2925                 goto out_err;   /* Missing options */
2926
2927         rbd_dev->spec->pool_name = dup_token(&buf, NULL);
2928         if (!rbd_dev->spec->pool_name)
2929                 goto out_mem;
2930         if (!*rbd_dev->spec->pool_name)
2931                 goto out_err;   /* Missing pool name */
2932
2933         rbd_dev->spec->image_name =
2934                 dup_token(&buf, &rbd_dev->spec->image_name_len);
2935         if (!rbd_dev->spec->image_name)
2936                 goto out_mem;
2937         if (!*rbd_dev->spec->image_name)
2938                 goto out_err;   /* Missing image name */
2939
2940         /*
2941          * Snapshot name is optional; default is to use "-"
2942          * (indicating the head/no snapshot).
2943          */
2944         len = next_token(&buf);
2945         if (!len) {
2946                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2947                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2948         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
2949                 ret = -ENAMETOOLONG;
2950                 goto out_err;
2951         }
2952         rbd_dev->spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
2953         if (!rbd_dev->spec->snap_name)
2954                 goto out_mem;
2955         memcpy(rbd_dev->spec->snap_name, buf, len);
2956         *(rbd_dev->spec->snap_name + len) = '\0';
2957
2958         /* Initialize all rbd options to the defaults */
2959
2960         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
2961         if (!rbd_opts)
2962                 goto out_mem;
2963
2964         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
2965
2966         *ceph_opts = ceph_parse_options(options, mon_addrs,
2967                                         mon_addrs + mon_addrs_size - 1,
2968                                         parse_rbd_opts_token, rbd_opts);
2969         kfree(options);
2970         if (IS_ERR(*ceph_opts)) {
2971                 ret = PTR_ERR(*ceph_opts);
2972                 goto out_err;
2973         }
2974         *opts = rbd_opts;
2975
2976         return 0;
2977 out_mem:
2978         ret = -ENOMEM;
2979 out_err:
2980         kfree(rbd_dev->spec->image_name);
2981         rbd_dev->spec->image_name = NULL;
2982         rbd_dev->spec->image_name_len = 0;
2983         kfree(rbd_dev->spec->pool_name);
2984         rbd_dev->spec->pool_name = NULL;
2985         kfree(options);
2986
2987         return ret;
2988 }
2989
2990 /*
2991  * An rbd format 2 image has a unique identifier, distinct from the
2992  * name given to it by the user.  Internally, that identifier is
2993  * what's used to specify the names of objects related to the image.
2994  *
2995  * A special "rbd id" object is used to map an rbd image name to its
2996  * id.  If that object doesn't exist, then there is no v2 rbd image
2997  * with the supplied name.
2998  *
2999  * This function will record the given rbd_dev's image_id field if
3000  * it can be determined, and in that case will return 0.  If any
3001  * errors occur a negative errno will be returned and the rbd_dev's
3002  * image_id field will be unchanged (and should be NULL).
3003  */
3004 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3005 {
3006         int ret;
3007         size_t size;
3008         char *object_name;
3009         void *response;
3010         void *p;
3011
3012         /*
3013          * First, see if the format 2 image id file exists, and if
3014          * so, get the image's persistent id from it.
3015          */
3016         size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
3017         object_name = kmalloc(size, GFP_NOIO);
3018         if (!object_name)
3019                 return -ENOMEM;
3020         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3021         dout("rbd id object name is %s\n", object_name);
3022
3023         /* Response will be an encoded string, which includes a length */
3024
3025         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3026         response = kzalloc(size, GFP_NOIO);
3027         if (!response) {
3028                 ret = -ENOMEM;
3029                 goto out;
3030         }
3031
3032         ret = rbd_req_sync_exec(rbd_dev, object_name,
3033                                 "rbd", "get_id",
3034                                 NULL, 0,
3035                                 response, RBD_IMAGE_ID_LEN_MAX,
3036                                 CEPH_OSD_FLAG_READ, NULL);
3037         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3038         if (ret < 0)
3039                 goto out;
3040         ret = 0;    /* rbd_req_sync_exec() can return positive */
3041
3042         p = response;
3043         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3044                                                 p + RBD_IMAGE_ID_LEN_MAX,
3045                                                 &rbd_dev->spec->image_id_len,
3046                                                 GFP_NOIO);
3047         if (IS_ERR(rbd_dev->spec->image_id)) {
3048                 ret = PTR_ERR(rbd_dev->spec->image_id);
3049                 rbd_dev->spec->image_id = NULL;
3050         } else {
3051                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3052         }
3053 out:
3054         kfree(response);
3055         kfree(object_name);
3056
3057         return ret;
3058 }
3059
3060 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3061 {
3062         int ret;
3063         size_t size;
3064
3065         /* Version 1 images have no id; empty string is used */
3066
3067         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3068         if (!rbd_dev->spec->image_id)
3069                 return -ENOMEM;
3070         rbd_dev->spec->image_id_len = 0;
3071
3072         /* Record the header object name for this rbd image. */
3073
3074         size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
3075         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3076         if (!rbd_dev->header_name) {
3077                 ret = -ENOMEM;
3078                 goto out_err;
3079         }
3080         sprintf(rbd_dev->header_name, "%s%s",
3081                 rbd_dev->spec->image_name, RBD_SUFFIX);
3082
3083         /* Populate rbd image metadata */
3084
3085         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3086         if (ret < 0)
3087                 goto out_err;
3088         rbd_dev->image_format = 1;
3089
3090         dout("discovered version 1 image, header name is %s\n",
3091                 rbd_dev->header_name);
3092
3093         return 0;
3094
3095 out_err:
3096         kfree(rbd_dev->header_name);
3097         rbd_dev->header_name = NULL;
3098         kfree(rbd_dev->spec->image_id);
3099         rbd_dev->spec->image_id = NULL;
3100
3101         return ret;
3102 }
3103
3104 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3105 {
3106         size_t size;
3107         int ret;
3108         u64 ver = 0;
3109
3110         /*
3111          * Image id was filled in by the caller.  Record the header
3112          * object name for this rbd image.
3113          */
3114         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
3115         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3116         if (!rbd_dev->header_name)
3117                 return -ENOMEM;
3118         sprintf(rbd_dev->header_name, "%s%s",
3119                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3120
3121         /* Get the size and object order for the image */
3122
3123         ret = rbd_dev_v2_image_size(rbd_dev);
3124         if (ret < 0)
3125                 goto out_err;
3126
3127         /* Get the object prefix (a.k.a. block_name) for the image */
3128
3129         ret = rbd_dev_v2_object_prefix(rbd_dev);
3130         if (ret < 0)
3131                 goto out_err;
3132
3133         /* Get the and check features for the image */
3134
3135         ret = rbd_dev_v2_features(rbd_dev);
3136         if (ret < 0)
3137                 goto out_err;
3138
3139         /* crypto and compression type aren't (yet) supported for v2 images */
3140
3141         rbd_dev->header.crypt_type = 0;
3142         rbd_dev->header.comp_type = 0;
3143
3144         /* Get the snapshot context, plus the header version */
3145
3146         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3147         if (ret)
3148                 goto out_err;
3149         rbd_dev->header.obj_version = ver;
3150
3151         rbd_dev->image_format = 2;
3152
3153         dout("discovered version 2 image, header name is %s\n",
3154                 rbd_dev->header_name);
3155
3156         return 0;
3157 out_err:
3158         kfree(rbd_dev->header_name);
3159         rbd_dev->header_name = NULL;
3160         kfree(rbd_dev->header.object_prefix);
3161         rbd_dev->header.object_prefix = NULL;
3162
3163         return ret;
3164 }
3165
3166 /*
3167  * Probe for the existence of the header object for the given rbd
3168  * device.  For format 2 images this includes determining the image
3169  * id.
3170  */
3171 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3172 {
3173         int ret;
3174
3175         /*
3176          * Get the id from the image id object.  If it's not a
3177          * format 2 image, we'll get ENOENT back, and we'll assume
3178          * it's a format 1 image.
3179          */
3180         ret = rbd_dev_image_id(rbd_dev);
3181         if (ret)
3182                 ret = rbd_dev_v1_probe(rbd_dev);
3183         else
3184                 ret = rbd_dev_v2_probe(rbd_dev);
3185         if (ret)
3186                 dout("probe failed, returning %d\n", ret);
3187
3188         return ret;
3189 }
3190
3191 static ssize_t rbd_add(struct bus_type *bus,
3192                        const char *buf,
3193                        size_t count)
3194 {
3195         struct rbd_device *rbd_dev = NULL;
3196         struct ceph_options *ceph_opts = NULL;
3197         struct rbd_options *rbd_opts = NULL;
3198         struct ceph_osd_client *osdc;
3199         int rc = -ENOMEM;
3200
3201         if (!try_module_get(THIS_MODULE))
3202                 return -ENODEV;
3203
3204         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3205         if (!rbd_dev)
3206                 return -ENOMEM;
3207         rbd_dev->spec = rbd_spec_alloc();
3208         if (!rbd_dev->spec)
3209                 goto err_out_mem;
3210
3211         /* static rbd_device initialization */
3212         spin_lock_init(&rbd_dev->lock);
3213         INIT_LIST_HEAD(&rbd_dev->node);
3214         INIT_LIST_HEAD(&rbd_dev->snaps);
3215         init_rwsem(&rbd_dev->header_rwsem);
3216
3217         /* parse add command */
3218         rc = rbd_add_parse_args(rbd_dev, buf, &ceph_opts, &rbd_opts);
3219         if (rc < 0)
3220                 goto err_out_mem;
3221         rbd_dev->mapping.read_only = rbd_opts->read_only;
3222
3223         rc = rbd_get_client(rbd_dev, ceph_opts);
3224         if (rc < 0)
3225                 goto err_out_args;
3226         ceph_opts = NULL;       /* ceph_opts now owned by rbd_dev client */
3227
3228         /* pick the pool */
3229         osdc = &rbd_dev->rbd_client->client->osdc;
3230         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->spec->pool_name);
3231         if (rc < 0)
3232                 goto err_out_client;
3233         rbd_dev->spec->pool_id = (u64) rc;
3234
3235         rc = rbd_dev_probe(rbd_dev);
3236         if (rc < 0)
3237                 goto err_out_client;
3238
3239         /* no need to lock here, as rbd_dev is not registered yet */
3240         rc = rbd_dev_snaps_update(rbd_dev);
3241         if (rc)
3242                 goto err_out_probe;
3243
3244         rc = rbd_dev_set_mapping(rbd_dev);
3245         if (rc)
3246                 goto err_out_snaps;
3247
3248         /* generate unique id: find highest unique id, add one */
3249         rbd_dev_id_get(rbd_dev);
3250
3251         /* Fill in the device name, now that we have its id. */
3252         BUILD_BUG_ON(DEV_NAME_LEN
3253                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3254         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3255
3256         /* Get our block major device number. */
3257
3258         rc = register_blkdev(0, rbd_dev->name);
3259         if (rc < 0)
3260                 goto err_out_id;
3261         rbd_dev->major = rc;
3262
3263         /* Set up the blkdev mapping. */
3264
3265         rc = rbd_init_disk(rbd_dev);
3266         if (rc)
3267                 goto err_out_blkdev;
3268
3269         rc = rbd_bus_add_dev(rbd_dev);
3270         if (rc)
3271                 goto err_out_disk;
3272
3273         /*
3274          * At this point cleanup in the event of an error is the job
3275          * of the sysfs code (initiated by rbd_bus_del_dev()).
3276          */
3277
3278         down_write(&rbd_dev->header_rwsem);
3279         rc = rbd_dev_snaps_register(rbd_dev);
3280         up_write(&rbd_dev->header_rwsem);
3281         if (rc)
3282                 goto err_out_bus;
3283
3284         rc = rbd_init_watch_dev(rbd_dev);
3285         if (rc)
3286                 goto err_out_bus;
3287
3288         kfree(rbd_opts);
3289
3290         /* Everything's ready.  Announce the disk to the world. */
3291
3292         add_disk(rbd_dev->disk);
3293
3294         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3295                 (unsigned long long) rbd_dev->mapping.size);
3296
3297         return count;
3298
3299 err_out_bus:
3300         /* this will also clean up rest of rbd_dev stuff */
3301
3302         rbd_bus_del_dev(rbd_dev);
3303         kfree(rbd_opts);
3304
3305         return rc;
3306
3307 err_out_disk:
3308         rbd_free_disk(rbd_dev);
3309 err_out_blkdev:
3310         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3311 err_out_id:
3312         rbd_dev_id_put(rbd_dev);
3313 err_out_snaps:
3314         rbd_remove_all_snaps(rbd_dev);
3315 err_out_probe:
3316         rbd_header_free(&rbd_dev->header);
3317 err_out_client:
3318         kfree(rbd_dev->header_name);
3319         rbd_put_client(rbd_dev);
3320 err_out_args:
3321         if (ceph_opts)
3322                 ceph_destroy_options(ceph_opts);
3323         kfree(rbd_opts);
3324 err_out_mem:
3325         rbd_spec_put(rbd_dev->spec);
3326         kfree(rbd_dev);
3327
3328         dout("Error adding device %s\n", buf);
3329         module_put(THIS_MODULE);
3330
3331         return (ssize_t) rc;
3332 }
3333
3334 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3335 {
3336         struct list_head *tmp;
3337         struct rbd_device *rbd_dev;
3338
3339         spin_lock(&rbd_dev_list_lock);
3340         list_for_each(tmp, &rbd_dev_list) {
3341                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3342                 if (rbd_dev->dev_id == dev_id) {
3343                         spin_unlock(&rbd_dev_list_lock);
3344                         return rbd_dev;
3345                 }
3346         }
3347         spin_unlock(&rbd_dev_list_lock);
3348         return NULL;
3349 }
3350
3351 static void rbd_dev_release(struct device *dev)
3352 {
3353         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3354
3355         if (rbd_dev->watch_request) {
3356                 struct ceph_client *client = rbd_dev->rbd_client->client;
3357
3358                 ceph_osdc_unregister_linger_request(&client->osdc,
3359                                                     rbd_dev->watch_request);
3360         }
3361         if (rbd_dev->watch_event)
3362                 rbd_req_sync_unwatch(rbd_dev);
3363
3364         rbd_put_client(rbd_dev);
3365
3366         /* clean up and free blkdev */
3367         rbd_free_disk(rbd_dev);
3368         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3369
3370         /* release allocated disk header fields */
3371         rbd_header_free(&rbd_dev->header);
3372
3373         /* done with the id, and with the rbd_dev */
3374         kfree(rbd_dev->header_name);
3375         rbd_dev_id_put(rbd_dev);
3376         rbd_spec_put(rbd_dev->spec);
3377         kfree(rbd_dev);
3378
3379         /* release module ref */
3380         module_put(THIS_MODULE);
3381 }
3382
3383 static ssize_t rbd_remove(struct bus_type *bus,
3384                           const char *buf,
3385                           size_t count)
3386 {
3387         struct rbd_device *rbd_dev = NULL;
3388         int target_id, rc;
3389         unsigned long ul;
3390         int ret = count;
3391
3392         rc = strict_strtoul(buf, 10, &ul);
3393         if (rc)
3394                 return rc;
3395
3396         /* convert to int; abort if we lost anything in the conversion */
3397         target_id = (int) ul;
3398         if (target_id != ul)
3399                 return -EINVAL;
3400
3401         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3402
3403         rbd_dev = __rbd_get_dev(target_id);
3404         if (!rbd_dev) {
3405                 ret = -ENOENT;
3406                 goto done;
3407         }
3408
3409         rbd_remove_all_snaps(rbd_dev);
3410         rbd_bus_del_dev(rbd_dev);
3411
3412 done:
3413         mutex_unlock(&ctl_mutex);
3414
3415         return ret;
3416 }
3417
3418 /*
3419  * create control files in sysfs
3420  * /sys/bus/rbd/...
3421  */
3422 static int rbd_sysfs_init(void)
3423 {
3424         int ret;
3425
3426         ret = device_register(&rbd_root_dev);
3427         if (ret < 0)
3428                 return ret;
3429
3430         ret = bus_register(&rbd_bus_type);
3431         if (ret < 0)
3432                 device_unregister(&rbd_root_dev);
3433
3434         return ret;
3435 }
3436
3437 static void rbd_sysfs_cleanup(void)
3438 {
3439         bus_unregister(&rbd_bus_type);
3440         device_unregister(&rbd_root_dev);
3441 }
3442
3443 int __init rbd_init(void)
3444 {
3445         int rc;
3446
3447         rc = rbd_sysfs_init();
3448         if (rc)
3449                 return rc;
3450         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3451         return 0;
3452 }
3453
3454 void __exit rbd_exit(void)
3455 {
3456         rbd_sysfs_cleanup();
3457 }
3458
3459 module_init(rbd_init);
3460 module_exit(rbd_exit);
3461
3462 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3463 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3464 MODULE_DESCRIPTION("rados block device");
3465
3466 /* following authorship retained from original osdblk.c */
3467 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3468
3469 MODULE_LICENSE("GPL");