]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: rename snap_exists field
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 struct rbd_options {
116         bool    read_only;
117 };
118
119 /*
120  * an instance of the client.  multiple devices may share an rbd client.
121  */
122 struct rbd_client {
123         struct ceph_client      *client;
124         struct kref             kref;
125         struct list_head        node;
126 };
127
128 /*
129  * a request completion status
130  */
131 struct rbd_req_status {
132         int done;
133         int rc;
134         u64 bytes;
135 };
136
137 /*
138  * a collection of requests
139  */
140 struct rbd_req_coll {
141         int                     total;
142         int                     num_done;
143         struct kref             kref;
144         struct rbd_req_status   status[0];
145 };
146
147 /*
148  * a single io request
149  */
150 struct rbd_request {
151         struct request          *rq;            /* blk layer request */
152         struct bio              *bio;           /* cloned bio */
153         struct page             **pages;        /* list of used pages */
154         u64                     len;
155         int                     coll_index;
156         struct rbd_req_coll     *coll;
157 };
158
159 struct rbd_snap {
160         struct  device          dev;
161         const char              *name;
162         u64                     size;
163         struct list_head        node;
164         u64                     id;
165         u64                     features;
166 };
167
168 struct rbd_mapping {
169         u64                     size;
170         u64                     features;
171         bool                    read_only;
172 };
173
174 /*
175  * a single device
176  */
177 struct rbd_device {
178         int                     dev_id;         /* blkdev unique id */
179
180         int                     major;          /* blkdev assigned major */
181         struct gendisk          *disk;          /* blkdev's gendisk and rq */
182
183         u32                     image_format;   /* Either 1 or 2 */
184         struct rbd_client       *rbd_client;
185
186         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
187
188         spinlock_t              lock;           /* queue lock */
189
190         struct rbd_image_header header;
191         bool                    exists;
192         char                    *image_id;
193         size_t                  image_id_len;
194         char                    *image_name;
195         size_t                  image_name_len;
196         char                    *header_name;
197         char                    *pool_name;
198         u64                     pool_id;
199
200         char                    *snap_name;
201         u64                     snap_id;
202
203         struct ceph_osd_event   *watch_event;
204         struct ceph_osd_request *watch_request;
205
206         /* protects updating the header */
207         struct rw_semaphore     header_rwsem;
208
209         struct rbd_mapping      mapping;
210
211         struct list_head        node;
212
213         /* list of snapshots */
214         struct list_head        snaps;
215
216         /* sysfs related */
217         struct device           dev;
218 };
219
220 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
221
222 static LIST_HEAD(rbd_dev_list);    /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
224
225 static LIST_HEAD(rbd_client_list);              /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
227
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
230
231 static void rbd_dev_release(struct device *dev);
232 static void rbd_remove_snap_dev(struct rbd_snap *snap);
233
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
235                        size_t count);
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
237                           size_t count);
238
239 static struct bus_attribute rbd_bus_attrs[] = {
240         __ATTR(add, S_IWUSR, NULL, rbd_add),
241         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
242         __ATTR_NULL
243 };
244
245 static struct bus_type rbd_bus_type = {
246         .name           = "rbd",
247         .bus_attrs      = rbd_bus_attrs,
248 };
249
250 static void rbd_root_dev_release(struct device *dev)
251 {
252 }
253
254 static struct device rbd_root_dev = {
255         .init_name =    "rbd",
256         .release =      rbd_root_dev_release,
257 };
258
259 #ifdef RBD_DEBUG
260 #define rbd_assert(expr)                                                \
261                 if (unlikely(!(expr))) {                                \
262                         printk(KERN_ERR "\nAssertion failure in %s() "  \
263                                                 "at line %d:\n\n"       \
264                                         "\trbd_assert(%s);\n\n",        \
265                                         __func__, __LINE__, #expr);     \
266                         BUG();                                          \
267                 }
268 #else /* !RBD_DEBUG */
269 #  define rbd_assert(expr)      ((void) 0)
270 #endif /* !RBD_DEBUG */
271
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
273 {
274         return get_device(&rbd_dev->dev);
275 }
276
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
278 {
279         put_device(&rbd_dev->dev);
280 }
281
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
284
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
286 {
287         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
288
289         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
290                 return -EROFS;
291
292         rbd_get_dev(rbd_dev);
293         set_device_ro(bdev, rbd_dev->mapping.read_only);
294
295         return 0;
296 }
297
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
299 {
300         struct rbd_device *rbd_dev = disk->private_data;
301
302         rbd_put_dev(rbd_dev);
303
304         return 0;
305 }
306
307 static const struct block_device_operations rbd_bd_ops = {
308         .owner                  = THIS_MODULE,
309         .open                   = rbd_open,
310         .release                = rbd_release,
311 };
312
313 /*
314  * Initialize an rbd client instance.
315  * We own *ceph_opts.
316  */
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
318 {
319         struct rbd_client *rbdc;
320         int ret = -ENOMEM;
321
322         dout("rbd_client_create\n");
323         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
324         if (!rbdc)
325                 goto out_opt;
326
327         kref_init(&rbdc->kref);
328         INIT_LIST_HEAD(&rbdc->node);
329
330         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
331
332         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333         if (IS_ERR(rbdc->client))
334                 goto out_mutex;
335         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
336
337         ret = ceph_open_session(rbdc->client);
338         if (ret < 0)
339                 goto out_err;
340
341         spin_lock(&rbd_client_list_lock);
342         list_add_tail(&rbdc->node, &rbd_client_list);
343         spin_unlock(&rbd_client_list_lock);
344
345         mutex_unlock(&ctl_mutex);
346
347         dout("rbd_client_create created %p\n", rbdc);
348         return rbdc;
349
350 out_err:
351         ceph_destroy_client(rbdc->client);
352 out_mutex:
353         mutex_unlock(&ctl_mutex);
354         kfree(rbdc);
355 out_opt:
356         if (ceph_opts)
357                 ceph_destroy_options(ceph_opts);
358         return ERR_PTR(ret);
359 }
360
361 /*
362  * Find a ceph client with specific addr and configuration.  If
363  * found, bump its reference count.
364  */
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
366 {
367         struct rbd_client *client_node;
368         bool found = false;
369
370         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
371                 return NULL;
372
373         spin_lock(&rbd_client_list_lock);
374         list_for_each_entry(client_node, &rbd_client_list, node) {
375                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376                         kref_get(&client_node->kref);
377                         found = true;
378                         break;
379                 }
380         }
381         spin_unlock(&rbd_client_list_lock);
382
383         return found ? client_node : NULL;
384 }
385
386 /*
387  * mount options
388  */
389 enum {
390         Opt_last_int,
391         /* int args above */
392         Opt_last_string,
393         /* string args above */
394         Opt_read_only,
395         Opt_read_write,
396         /* Boolean args above */
397         Opt_last_bool,
398 };
399
400 static match_table_t rbd_opts_tokens = {
401         /* int args above */
402         /* string args above */
403         {Opt_read_only, "read_only"},
404         {Opt_read_only, "ro"},          /* Alternate spelling */
405         {Opt_read_write, "read_write"},
406         {Opt_read_write, "rw"},         /* Alternate spelling */
407         /* Boolean args above */
408         {-1, NULL}
409 };
410
411 static int parse_rbd_opts_token(char *c, void *private)
412 {
413         struct rbd_options *rbd_opts = private;
414         substring_t argstr[MAX_OPT_ARGS];
415         int token, intval, ret;
416
417         token = match_token(c, rbd_opts_tokens, argstr);
418         if (token < 0)
419                 return -EINVAL;
420
421         if (token < Opt_last_int) {
422                 ret = match_int(&argstr[0], &intval);
423                 if (ret < 0) {
424                         pr_err("bad mount option arg (not int) "
425                                "at '%s'\n", c);
426                         return ret;
427                 }
428                 dout("got int token %d val %d\n", token, intval);
429         } else if (token > Opt_last_int && token < Opt_last_string) {
430                 dout("got string token %d val %s\n", token,
431                      argstr[0].from);
432         } else if (token > Opt_last_string && token < Opt_last_bool) {
433                 dout("got Boolean token %d\n", token);
434         } else {
435                 dout("got token %d\n", token);
436         }
437
438         switch (token) {
439         case Opt_read_only:
440                 rbd_opts->read_only = true;
441                 break;
442         case Opt_read_write:
443                 rbd_opts->read_only = false;
444                 break;
445         default:
446                 rbd_assert(false);
447                 break;
448         }
449         return 0;
450 }
451
452 /*
453  * Get a ceph client with specific addr and configuration, if one does
454  * not exist create it.
455  */
456 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
457                                 size_t mon_addr_len, char *options)
458 {
459         struct rbd_options rbd_opts;
460         struct ceph_options *ceph_opts;
461         struct rbd_client *rbdc;
462
463         /* Initialize all rbd options to the defaults */
464
465         rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
466
467         ceph_opts = ceph_parse_options(options, mon_addr,
468                                         mon_addr + mon_addr_len,
469                                         parse_rbd_opts_token, &rbd_opts);
470         if (IS_ERR(ceph_opts))
471                 return PTR_ERR(ceph_opts);
472
473         /* Record the parsed rbd options */
474
475         rbd_dev->mapping.read_only = rbd_opts.read_only;
476
477         rbdc = rbd_client_find(ceph_opts);
478         if (rbdc) {
479                 /* using an existing client */
480                 ceph_destroy_options(ceph_opts);
481         } else {
482                 rbdc = rbd_client_create(ceph_opts);
483                 if (IS_ERR(rbdc))
484                         return PTR_ERR(rbdc);
485         }
486         rbd_dev->rbd_client = rbdc;
487
488         return 0;
489 }
490
491 /*
492  * Destroy ceph client
493  *
494  * Caller must hold rbd_client_list_lock.
495  */
496 static void rbd_client_release(struct kref *kref)
497 {
498         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
499
500         dout("rbd_release_client %p\n", rbdc);
501         spin_lock(&rbd_client_list_lock);
502         list_del(&rbdc->node);
503         spin_unlock(&rbd_client_list_lock);
504
505         ceph_destroy_client(rbdc->client);
506         kfree(rbdc);
507 }
508
509 /*
510  * Drop reference to ceph client node. If it's not referenced anymore, release
511  * it.
512  */
513 static void rbd_put_client(struct rbd_device *rbd_dev)
514 {
515         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
516         rbd_dev->rbd_client = NULL;
517 }
518
519 /*
520  * Destroy requests collection
521  */
522 static void rbd_coll_release(struct kref *kref)
523 {
524         struct rbd_req_coll *coll =
525                 container_of(kref, struct rbd_req_coll, kref);
526
527         dout("rbd_coll_release %p\n", coll);
528         kfree(coll);
529 }
530
531 static bool rbd_image_format_valid(u32 image_format)
532 {
533         return image_format == 1 || image_format == 2;
534 }
535
536 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
537 {
538         size_t size;
539         u32 snap_count;
540
541         /* The header has to start with the magic rbd header text */
542         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
543                 return false;
544
545         /* The bio layer requires at least sector-sized I/O */
546
547         if (ondisk->options.order < SECTOR_SHIFT)
548                 return false;
549
550         /* If we use u64 in a few spots we may be able to loosen this */
551
552         if (ondisk->options.order > 8 * sizeof (int) - 1)
553                 return false;
554
555         /*
556          * The size of a snapshot header has to fit in a size_t, and
557          * that limits the number of snapshots.
558          */
559         snap_count = le32_to_cpu(ondisk->snap_count);
560         size = SIZE_MAX - sizeof (struct ceph_snap_context);
561         if (snap_count > size / sizeof (__le64))
562                 return false;
563
564         /*
565          * Not only that, but the size of the entire the snapshot
566          * header must also be representable in a size_t.
567          */
568         size -= snap_count * sizeof (__le64);
569         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
570                 return false;
571
572         return true;
573 }
574
575 /*
576  * Create a new header structure, translate header format from the on-disk
577  * header.
578  */
579 static int rbd_header_from_disk(struct rbd_image_header *header,
580                                  struct rbd_image_header_ondisk *ondisk)
581 {
582         u32 snap_count;
583         size_t len;
584         size_t size;
585         u32 i;
586
587         memset(header, 0, sizeof (*header));
588
589         snap_count = le32_to_cpu(ondisk->snap_count);
590
591         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
592         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
593         if (!header->object_prefix)
594                 return -ENOMEM;
595         memcpy(header->object_prefix, ondisk->object_prefix, len);
596         header->object_prefix[len] = '\0';
597
598         if (snap_count) {
599                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
600
601                 /* Save a copy of the snapshot names */
602
603                 if (snap_names_len > (u64) SIZE_MAX)
604                         return -EIO;
605                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
606                 if (!header->snap_names)
607                         goto out_err;
608                 /*
609                  * Note that rbd_dev_v1_header_read() guarantees
610                  * the ondisk buffer we're working with has
611                  * snap_names_len bytes beyond the end of the
612                  * snapshot id array, this memcpy() is safe.
613                  */
614                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
615                         snap_names_len);
616
617                 /* Record each snapshot's size */
618
619                 size = snap_count * sizeof (*header->snap_sizes);
620                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
621                 if (!header->snap_sizes)
622                         goto out_err;
623                 for (i = 0; i < snap_count; i++)
624                         header->snap_sizes[i] =
625                                 le64_to_cpu(ondisk->snaps[i].image_size);
626         } else {
627                 WARN_ON(ondisk->snap_names_len);
628                 header->snap_names = NULL;
629                 header->snap_sizes = NULL;
630         }
631
632         header->features = 0;   /* No features support in v1 images */
633         header->obj_order = ondisk->options.order;
634         header->crypt_type = ondisk->options.crypt_type;
635         header->comp_type = ondisk->options.comp_type;
636
637         /* Allocate and fill in the snapshot context */
638
639         header->image_size = le64_to_cpu(ondisk->image_size);
640         size = sizeof (struct ceph_snap_context);
641         size += snap_count * sizeof (header->snapc->snaps[0]);
642         header->snapc = kzalloc(size, GFP_KERNEL);
643         if (!header->snapc)
644                 goto out_err;
645
646         atomic_set(&header->snapc->nref, 1);
647         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
648         header->snapc->num_snaps = snap_count;
649         for (i = 0; i < snap_count; i++)
650                 header->snapc->snaps[i] =
651                         le64_to_cpu(ondisk->snaps[i].id);
652
653         return 0;
654
655 out_err:
656         kfree(header->snap_sizes);
657         header->snap_sizes = NULL;
658         kfree(header->snap_names);
659         header->snap_names = NULL;
660         kfree(header->object_prefix);
661         header->object_prefix = NULL;
662
663         return -ENOMEM;
664 }
665
666 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
667 {
668
669         struct rbd_snap *snap;
670
671         list_for_each_entry(snap, &rbd_dev->snaps, node) {
672                 if (!strcmp(snap_name, snap->name)) {
673                         rbd_dev->snap_id = snap->id;
674                         rbd_dev->mapping.size = snap->size;
675                         rbd_dev->mapping.features = snap->features;
676
677                         return 0;
678                 }
679         }
680
681         return -ENOENT;
682 }
683
684 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
685 {
686         int ret;
687
688         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
689                     sizeof (RBD_SNAP_HEAD_NAME))) {
690                 rbd_dev->snap_id = CEPH_NOSNAP;
691                 rbd_dev->mapping.size = rbd_dev->header.image_size;
692                 rbd_dev->mapping.features = rbd_dev->header.features;
693                 ret = 0;
694         } else {
695                 ret = snap_by_name(rbd_dev, snap_name);
696                 if (ret < 0)
697                         goto done;
698                 rbd_dev->mapping.read_only = true;
699         }
700         rbd_dev->snap_name = snap_name;
701         rbd_dev->exists = true;
702 done:
703         return ret;
704 }
705
706 static void rbd_header_free(struct rbd_image_header *header)
707 {
708         kfree(header->object_prefix);
709         header->object_prefix = NULL;
710         kfree(header->snap_sizes);
711         header->snap_sizes = NULL;
712         kfree(header->snap_names);
713         header->snap_names = NULL;
714         ceph_put_snap_context(header->snapc);
715         header->snapc = NULL;
716 }
717
718 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
719 {
720         char *name;
721         u64 segment;
722         int ret;
723
724         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
725         if (!name)
726                 return NULL;
727         segment = offset >> rbd_dev->header.obj_order;
728         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
729                         rbd_dev->header.object_prefix, segment);
730         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
731                 pr_err("error formatting segment name for #%llu (%d)\n",
732                         segment, ret);
733                 kfree(name);
734                 name = NULL;
735         }
736
737         return name;
738 }
739
740 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
741 {
742         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
743
744         return offset & (segment_size - 1);
745 }
746
747 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
748                                 u64 offset, u64 length)
749 {
750         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
751
752         offset &= segment_size - 1;
753
754         rbd_assert(length <= U64_MAX - offset);
755         if (offset + length > segment_size)
756                 length = segment_size - offset;
757
758         return length;
759 }
760
761 static int rbd_get_num_segments(struct rbd_image_header *header,
762                                 u64 ofs, u64 len)
763 {
764         u64 start_seg;
765         u64 end_seg;
766
767         if (!len)
768                 return 0;
769         if (len - 1 > U64_MAX - ofs)
770                 return -ERANGE;
771
772         start_seg = ofs >> header->obj_order;
773         end_seg = (ofs + len - 1) >> header->obj_order;
774
775         return end_seg - start_seg + 1;
776 }
777
778 /*
779  * returns the size of an object in the image
780  */
781 static u64 rbd_obj_bytes(struct rbd_image_header *header)
782 {
783         return 1 << header->obj_order;
784 }
785
786 /*
787  * bio helpers
788  */
789
790 static void bio_chain_put(struct bio *chain)
791 {
792         struct bio *tmp;
793
794         while (chain) {
795                 tmp = chain;
796                 chain = chain->bi_next;
797                 bio_put(tmp);
798         }
799 }
800
801 /*
802  * zeros a bio chain, starting at specific offset
803  */
804 static void zero_bio_chain(struct bio *chain, int start_ofs)
805 {
806         struct bio_vec *bv;
807         unsigned long flags;
808         void *buf;
809         int i;
810         int pos = 0;
811
812         while (chain) {
813                 bio_for_each_segment(bv, chain, i) {
814                         if (pos + bv->bv_len > start_ofs) {
815                                 int remainder = max(start_ofs - pos, 0);
816                                 buf = bvec_kmap_irq(bv, &flags);
817                                 memset(buf + remainder, 0,
818                                        bv->bv_len - remainder);
819                                 bvec_kunmap_irq(buf, &flags);
820                         }
821                         pos += bv->bv_len;
822                 }
823
824                 chain = chain->bi_next;
825         }
826 }
827
828 /*
829  * Clone a portion of a bio, starting at the given byte offset
830  * and continuing for the number of bytes indicated.
831  */
832 static struct bio *bio_clone_range(struct bio *bio_src,
833                                         unsigned int offset,
834                                         unsigned int len,
835                                         gfp_t gfpmask)
836 {
837         struct bio_vec *bv;
838         unsigned int resid;
839         unsigned short idx;
840         unsigned int voff;
841         unsigned short end_idx;
842         unsigned short vcnt;
843         struct bio *bio;
844
845         /* Handle the easy case for the caller */
846
847         if (!offset && len == bio_src->bi_size)
848                 return bio_clone(bio_src, gfpmask);
849
850         if (WARN_ON_ONCE(!len))
851                 return NULL;
852         if (WARN_ON_ONCE(len > bio_src->bi_size))
853                 return NULL;
854         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
855                 return NULL;
856
857         /* Find first affected segment... */
858
859         resid = offset;
860         __bio_for_each_segment(bv, bio_src, idx, 0) {
861                 if (resid < bv->bv_len)
862                         break;
863                 resid -= bv->bv_len;
864         }
865         voff = resid;
866
867         /* ...and the last affected segment */
868
869         resid += len;
870         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
871                 if (resid <= bv->bv_len)
872                         break;
873                 resid -= bv->bv_len;
874         }
875         vcnt = end_idx - idx + 1;
876
877         /* Build the clone */
878
879         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
880         if (!bio)
881                 return NULL;    /* ENOMEM */
882
883         bio->bi_bdev = bio_src->bi_bdev;
884         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
885         bio->bi_rw = bio_src->bi_rw;
886         bio->bi_flags |= 1 << BIO_CLONED;
887
888         /*
889          * Copy over our part of the bio_vec, then update the first
890          * and last (or only) entries.
891          */
892         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
893                         vcnt * sizeof (struct bio_vec));
894         bio->bi_io_vec[0].bv_offset += voff;
895         if (vcnt > 1) {
896                 bio->bi_io_vec[0].bv_len -= voff;
897                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
898         } else {
899                 bio->bi_io_vec[0].bv_len = len;
900         }
901
902         bio->bi_vcnt = vcnt;
903         bio->bi_size = len;
904         bio->bi_idx = 0;
905
906         return bio;
907 }
908
909 /*
910  * Clone a portion of a bio chain, starting at the given byte offset
911  * into the first bio in the source chain and continuing for the
912  * number of bytes indicated.  The result is another bio chain of
913  * exactly the given length, or a null pointer on error.
914  *
915  * The bio_src and offset parameters are both in-out.  On entry they
916  * refer to the first source bio and the offset into that bio where
917  * the start of data to be cloned is located.
918  *
919  * On return, bio_src is updated to refer to the bio in the source
920  * chain that contains first un-cloned byte, and *offset will
921  * contain the offset of that byte within that bio.
922  */
923 static struct bio *bio_chain_clone_range(struct bio **bio_src,
924                                         unsigned int *offset,
925                                         unsigned int len,
926                                         gfp_t gfpmask)
927 {
928         struct bio *bi = *bio_src;
929         unsigned int off = *offset;
930         struct bio *chain = NULL;
931         struct bio **end;
932
933         /* Build up a chain of clone bios up to the limit */
934
935         if (!bi || off >= bi->bi_size || !len)
936                 return NULL;            /* Nothing to clone */
937
938         end = &chain;
939         while (len) {
940                 unsigned int bi_size;
941                 struct bio *bio;
942
943                 if (!bi)
944                         goto out_err;   /* EINVAL; ran out of bio's */
945                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
946                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
947                 if (!bio)
948                         goto out_err;   /* ENOMEM */
949
950                 *end = bio;
951                 end = &bio->bi_next;
952
953                 off += bi_size;
954                 if (off == bi->bi_size) {
955                         bi = bi->bi_next;
956                         off = 0;
957                 }
958                 len -= bi_size;
959         }
960         *bio_src = bi;
961         *offset = off;
962
963         return chain;
964 out_err:
965         bio_chain_put(chain);
966
967         return NULL;
968 }
969
970 /*
971  * helpers for osd request op vectors.
972  */
973 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
974                                         int opcode, u32 payload_len)
975 {
976         struct ceph_osd_req_op *ops;
977
978         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
979         if (!ops)
980                 return NULL;
981
982         ops[0].op = opcode;
983
984         /*
985          * op extent offset and length will be set later on
986          * in calc_raw_layout()
987          */
988         ops[0].payload_len = payload_len;
989
990         return ops;
991 }
992
993 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
994 {
995         kfree(ops);
996 }
997
998 static void rbd_coll_end_req_index(struct request *rq,
999                                    struct rbd_req_coll *coll,
1000                                    int index,
1001                                    int ret, u64 len)
1002 {
1003         struct request_queue *q;
1004         int min, max, i;
1005
1006         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1007              coll, index, ret, (unsigned long long) len);
1008
1009         if (!rq)
1010                 return;
1011
1012         if (!coll) {
1013                 blk_end_request(rq, ret, len);
1014                 return;
1015         }
1016
1017         q = rq->q;
1018
1019         spin_lock_irq(q->queue_lock);
1020         coll->status[index].done = 1;
1021         coll->status[index].rc = ret;
1022         coll->status[index].bytes = len;
1023         max = min = coll->num_done;
1024         while (max < coll->total && coll->status[max].done)
1025                 max++;
1026
1027         for (i = min; i<max; i++) {
1028                 __blk_end_request(rq, coll->status[i].rc,
1029                                   coll->status[i].bytes);
1030                 coll->num_done++;
1031                 kref_put(&coll->kref, rbd_coll_release);
1032         }
1033         spin_unlock_irq(q->queue_lock);
1034 }
1035
1036 static void rbd_coll_end_req(struct rbd_request *req,
1037                              int ret, u64 len)
1038 {
1039         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1040 }
1041
1042 /*
1043  * Send ceph osd request
1044  */
1045 static int rbd_do_request(struct request *rq,
1046                           struct rbd_device *rbd_dev,
1047                           struct ceph_snap_context *snapc,
1048                           u64 snapid,
1049                           const char *object_name, u64 ofs, u64 len,
1050                           struct bio *bio,
1051                           struct page **pages,
1052                           int num_pages,
1053                           int flags,
1054                           struct ceph_osd_req_op *ops,
1055                           struct rbd_req_coll *coll,
1056                           int coll_index,
1057                           void (*rbd_cb)(struct ceph_osd_request *req,
1058                                          struct ceph_msg *msg),
1059                           struct ceph_osd_request **linger_req,
1060                           u64 *ver)
1061 {
1062         struct ceph_osd_request *req;
1063         struct ceph_file_layout *layout;
1064         int ret;
1065         u64 bno;
1066         struct timespec mtime = CURRENT_TIME;
1067         struct rbd_request *req_data;
1068         struct ceph_osd_request_head *reqhead;
1069         struct ceph_osd_client *osdc;
1070
1071         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1072         if (!req_data) {
1073                 if (coll)
1074                         rbd_coll_end_req_index(rq, coll, coll_index,
1075                                                -ENOMEM, len);
1076                 return -ENOMEM;
1077         }
1078
1079         if (coll) {
1080                 req_data->coll = coll;
1081                 req_data->coll_index = coll_index;
1082         }
1083
1084         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1085                 object_name, (unsigned long long) ofs,
1086                 (unsigned long long) len, coll, coll_index);
1087
1088         osdc = &rbd_dev->rbd_client->client->osdc;
1089         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1090                                         false, GFP_NOIO, pages, bio);
1091         if (!req) {
1092                 ret = -ENOMEM;
1093                 goto done_pages;
1094         }
1095
1096         req->r_callback = rbd_cb;
1097
1098         req_data->rq = rq;
1099         req_data->bio = bio;
1100         req_data->pages = pages;
1101         req_data->len = len;
1102
1103         req->r_priv = req_data;
1104
1105         reqhead = req->r_request->front.iov_base;
1106         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1107
1108         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1109         req->r_oid_len = strlen(req->r_oid);
1110
1111         layout = &req->r_file_layout;
1112         memset(layout, 0, sizeof(*layout));
1113         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1114         layout->fl_stripe_count = cpu_to_le32(1);
1115         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1116         layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->pool_id);
1117         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1118                                    req, ops);
1119         rbd_assert(ret == 0);
1120
1121         ceph_osdc_build_request(req, ofs, &len,
1122                                 ops,
1123                                 snapc,
1124                                 &mtime,
1125                                 req->r_oid, req->r_oid_len);
1126
1127         if (linger_req) {
1128                 ceph_osdc_set_request_linger(osdc, req);
1129                 *linger_req = req;
1130         }
1131
1132         ret = ceph_osdc_start_request(osdc, req, false);
1133         if (ret < 0)
1134                 goto done_err;
1135
1136         if (!rbd_cb) {
1137                 ret = ceph_osdc_wait_request(osdc, req);
1138                 if (ver)
1139                         *ver = le64_to_cpu(req->r_reassert_version.version);
1140                 dout("reassert_ver=%llu\n",
1141                         (unsigned long long)
1142                                 le64_to_cpu(req->r_reassert_version.version));
1143                 ceph_osdc_put_request(req);
1144         }
1145         return ret;
1146
1147 done_err:
1148         bio_chain_put(req_data->bio);
1149         ceph_osdc_put_request(req);
1150 done_pages:
1151         rbd_coll_end_req(req_data, ret, len);
1152         kfree(req_data);
1153         return ret;
1154 }
1155
1156 /*
1157  * Ceph osd op callback
1158  */
1159 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1160 {
1161         struct rbd_request *req_data = req->r_priv;
1162         struct ceph_osd_reply_head *replyhead;
1163         struct ceph_osd_op *op;
1164         __s32 rc;
1165         u64 bytes;
1166         int read_op;
1167
1168         /* parse reply */
1169         replyhead = msg->front.iov_base;
1170         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1171         op = (void *)(replyhead + 1);
1172         rc = le32_to_cpu(replyhead->result);
1173         bytes = le64_to_cpu(op->extent.length);
1174         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1175
1176         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1177                 (unsigned long long) bytes, read_op, (int) rc);
1178
1179         if (rc == -ENOENT && read_op) {
1180                 zero_bio_chain(req_data->bio, 0);
1181                 rc = 0;
1182         } else if (rc == 0 && read_op && bytes < req_data->len) {
1183                 zero_bio_chain(req_data->bio, bytes);
1184                 bytes = req_data->len;
1185         }
1186
1187         rbd_coll_end_req(req_data, rc, bytes);
1188
1189         if (req_data->bio)
1190                 bio_chain_put(req_data->bio);
1191
1192         ceph_osdc_put_request(req);
1193         kfree(req_data);
1194 }
1195
1196 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1197 {
1198         ceph_osdc_put_request(req);
1199 }
1200
1201 /*
1202  * Do a synchronous ceph osd operation
1203  */
1204 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1205                            struct ceph_snap_context *snapc,
1206                            u64 snapid,
1207                            int flags,
1208                            struct ceph_osd_req_op *ops,
1209                            const char *object_name,
1210                            u64 ofs, u64 inbound_size,
1211                            char *inbound,
1212                            struct ceph_osd_request **linger_req,
1213                            u64 *ver)
1214 {
1215         int ret;
1216         struct page **pages;
1217         int num_pages;
1218
1219         rbd_assert(ops != NULL);
1220
1221         num_pages = calc_pages_for(ofs, inbound_size);
1222         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1223         if (IS_ERR(pages))
1224                 return PTR_ERR(pages);
1225
1226         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1227                           object_name, ofs, inbound_size, NULL,
1228                           pages, num_pages,
1229                           flags,
1230                           ops,
1231                           NULL, 0,
1232                           NULL,
1233                           linger_req, ver);
1234         if (ret < 0)
1235                 goto done;
1236
1237         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1238                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1239
1240 done:
1241         ceph_release_page_vector(pages, num_pages);
1242         return ret;
1243 }
1244
1245 /*
1246  * Do an asynchronous ceph osd operation
1247  */
1248 static int rbd_do_op(struct request *rq,
1249                      struct rbd_device *rbd_dev,
1250                      struct ceph_snap_context *snapc,
1251                      u64 ofs, u64 len,
1252                      struct bio *bio,
1253                      struct rbd_req_coll *coll,
1254                      int coll_index)
1255 {
1256         char *seg_name;
1257         u64 seg_ofs;
1258         u64 seg_len;
1259         int ret;
1260         struct ceph_osd_req_op *ops;
1261         u32 payload_len;
1262         int opcode;
1263         int flags;
1264         u64 snapid;
1265
1266         seg_name = rbd_segment_name(rbd_dev, ofs);
1267         if (!seg_name)
1268                 return -ENOMEM;
1269         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1270         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1271
1272         if (rq_data_dir(rq) == WRITE) {
1273                 opcode = CEPH_OSD_OP_WRITE;
1274                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1275                 snapid = CEPH_NOSNAP;
1276                 payload_len = seg_len;
1277         } else {
1278                 opcode = CEPH_OSD_OP_READ;
1279                 flags = CEPH_OSD_FLAG_READ;
1280                 snapc = NULL;
1281                 snapid = rbd_dev->snap_id;
1282                 payload_len = 0;
1283         }
1284
1285         ret = -ENOMEM;
1286         ops = rbd_create_rw_ops(1, opcode, payload_len);
1287         if (!ops)
1288                 goto done;
1289
1290         /* we've taken care of segment sizes earlier when we
1291            cloned the bios. We should never have a segment
1292            truncated at this point */
1293         rbd_assert(seg_len == len);
1294
1295         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1296                              seg_name, seg_ofs, seg_len,
1297                              bio,
1298                              NULL, 0,
1299                              flags,
1300                              ops,
1301                              coll, coll_index,
1302                              rbd_req_cb, 0, NULL);
1303
1304         rbd_destroy_ops(ops);
1305 done:
1306         kfree(seg_name);
1307         return ret;
1308 }
1309
1310 /*
1311  * Request sync osd read
1312  */
1313 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1314                           u64 snapid,
1315                           const char *object_name,
1316                           u64 ofs, u64 len,
1317                           char *buf,
1318                           u64 *ver)
1319 {
1320         struct ceph_osd_req_op *ops;
1321         int ret;
1322
1323         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1324         if (!ops)
1325                 return -ENOMEM;
1326
1327         ret = rbd_req_sync_op(rbd_dev, NULL,
1328                                snapid,
1329                                CEPH_OSD_FLAG_READ,
1330                                ops, object_name, ofs, len, buf, NULL, ver);
1331         rbd_destroy_ops(ops);
1332
1333         return ret;
1334 }
1335
1336 /*
1337  * Request sync osd watch
1338  */
1339 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1340                                    u64 ver,
1341                                    u64 notify_id)
1342 {
1343         struct ceph_osd_req_op *ops;
1344         int ret;
1345
1346         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1347         if (!ops)
1348                 return -ENOMEM;
1349
1350         ops[0].watch.ver = cpu_to_le64(ver);
1351         ops[0].watch.cookie = notify_id;
1352         ops[0].watch.flag = 0;
1353
1354         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1355                           rbd_dev->header_name, 0, 0, NULL,
1356                           NULL, 0,
1357                           CEPH_OSD_FLAG_READ,
1358                           ops,
1359                           NULL, 0,
1360                           rbd_simple_req_cb, 0, NULL);
1361
1362         rbd_destroy_ops(ops);
1363         return ret;
1364 }
1365
1366 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1367 {
1368         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1369         u64 hver;
1370         int rc;
1371
1372         if (!rbd_dev)
1373                 return;
1374
1375         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1376                 rbd_dev->header_name, (unsigned long long) notify_id,
1377                 (unsigned int) opcode);
1378         rc = rbd_dev_refresh(rbd_dev, &hver);
1379         if (rc)
1380                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1381                            " update snaps: %d\n", rbd_dev->major, rc);
1382
1383         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1384 }
1385
1386 /*
1387  * Request sync osd watch
1388  */
1389 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1390 {
1391         struct ceph_osd_req_op *ops;
1392         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1393         int ret;
1394
1395         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1396         if (!ops)
1397                 return -ENOMEM;
1398
1399         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1400                                      (void *)rbd_dev, &rbd_dev->watch_event);
1401         if (ret < 0)
1402                 goto fail;
1403
1404         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1405         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1406         ops[0].watch.flag = 1;
1407
1408         ret = rbd_req_sync_op(rbd_dev, NULL,
1409                               CEPH_NOSNAP,
1410                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1411                               ops,
1412                               rbd_dev->header_name,
1413                               0, 0, NULL,
1414                               &rbd_dev->watch_request, NULL);
1415
1416         if (ret < 0)
1417                 goto fail_event;
1418
1419         rbd_destroy_ops(ops);
1420         return 0;
1421
1422 fail_event:
1423         ceph_osdc_cancel_event(rbd_dev->watch_event);
1424         rbd_dev->watch_event = NULL;
1425 fail:
1426         rbd_destroy_ops(ops);
1427         return ret;
1428 }
1429
1430 /*
1431  * Request sync osd unwatch
1432  */
1433 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1434 {
1435         struct ceph_osd_req_op *ops;
1436         int ret;
1437
1438         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1439         if (!ops)
1440                 return -ENOMEM;
1441
1442         ops[0].watch.ver = 0;
1443         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1444         ops[0].watch.flag = 0;
1445
1446         ret = rbd_req_sync_op(rbd_dev, NULL,
1447                               CEPH_NOSNAP,
1448                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1449                               ops,
1450                               rbd_dev->header_name,
1451                               0, 0, NULL, NULL, NULL);
1452
1453
1454         rbd_destroy_ops(ops);
1455         ceph_osdc_cancel_event(rbd_dev->watch_event);
1456         rbd_dev->watch_event = NULL;
1457         return ret;
1458 }
1459
1460 /*
1461  * Synchronous osd object method call
1462  */
1463 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1464                              const char *object_name,
1465                              const char *class_name,
1466                              const char *method_name,
1467                              const char *outbound,
1468                              size_t outbound_size,
1469                              char *inbound,
1470                              size_t inbound_size,
1471                              int flags,
1472                              u64 *ver)
1473 {
1474         struct ceph_osd_req_op *ops;
1475         int class_name_len = strlen(class_name);
1476         int method_name_len = strlen(method_name);
1477         int payload_size;
1478         int ret;
1479
1480         /*
1481          * Any input parameters required by the method we're calling
1482          * will be sent along with the class and method names as
1483          * part of the message payload.  That data and its size are
1484          * supplied via the indata and indata_len fields (named from
1485          * the perspective of the server side) in the OSD request
1486          * operation.
1487          */
1488         payload_size = class_name_len + method_name_len + outbound_size;
1489         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1490         if (!ops)
1491                 return -ENOMEM;
1492
1493         ops[0].cls.class_name = class_name;
1494         ops[0].cls.class_len = (__u8) class_name_len;
1495         ops[0].cls.method_name = method_name;
1496         ops[0].cls.method_len = (__u8) method_name_len;
1497         ops[0].cls.argc = 0;
1498         ops[0].cls.indata = outbound;
1499         ops[0].cls.indata_len = outbound_size;
1500
1501         ret = rbd_req_sync_op(rbd_dev, NULL,
1502                                CEPH_NOSNAP,
1503                                flags, ops,
1504                                object_name, 0, inbound_size, inbound,
1505                                NULL, ver);
1506
1507         rbd_destroy_ops(ops);
1508
1509         dout("cls_exec returned %d\n", ret);
1510         return ret;
1511 }
1512
1513 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1514 {
1515         struct rbd_req_coll *coll =
1516                         kzalloc(sizeof(struct rbd_req_coll) +
1517                                 sizeof(struct rbd_req_status) * num_reqs,
1518                                 GFP_ATOMIC);
1519
1520         if (!coll)
1521                 return NULL;
1522         coll->total = num_reqs;
1523         kref_init(&coll->kref);
1524         return coll;
1525 }
1526
1527 /*
1528  * block device queue callback
1529  */
1530 static void rbd_rq_fn(struct request_queue *q)
1531 {
1532         struct rbd_device *rbd_dev = q->queuedata;
1533         struct request *rq;
1534
1535         while ((rq = blk_fetch_request(q))) {
1536                 struct bio *bio;
1537                 bool do_write;
1538                 unsigned int size;
1539                 u64 ofs;
1540                 int num_segs, cur_seg = 0;
1541                 struct rbd_req_coll *coll;
1542                 struct ceph_snap_context *snapc;
1543                 unsigned int bio_offset;
1544
1545                 dout("fetched request\n");
1546
1547                 /* filter out block requests we don't understand */
1548                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1549                         __blk_end_request_all(rq, 0);
1550                         continue;
1551                 }
1552
1553                 /* deduce our operation (read, write) */
1554                 do_write = (rq_data_dir(rq) == WRITE);
1555                 if (do_write && rbd_dev->mapping.read_only) {
1556                         __blk_end_request_all(rq, -EROFS);
1557                         continue;
1558                 }
1559
1560                 spin_unlock_irq(q->queue_lock);
1561
1562                 down_read(&rbd_dev->header_rwsem);
1563
1564                 if (!rbd_dev->exists) {
1565                         rbd_assert(rbd_dev->snap_id != CEPH_NOSNAP);
1566                         up_read(&rbd_dev->header_rwsem);
1567                         dout("request for non-existent snapshot");
1568                         spin_lock_irq(q->queue_lock);
1569                         __blk_end_request_all(rq, -ENXIO);
1570                         continue;
1571                 }
1572
1573                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1574
1575                 up_read(&rbd_dev->header_rwsem);
1576
1577                 size = blk_rq_bytes(rq);
1578                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1579                 bio = rq->bio;
1580
1581                 dout("%s 0x%x bytes at 0x%llx\n",
1582                      do_write ? "write" : "read",
1583                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1584
1585                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1586                 if (num_segs <= 0) {
1587                         spin_lock_irq(q->queue_lock);
1588                         __blk_end_request_all(rq, num_segs);
1589                         ceph_put_snap_context(snapc);
1590                         continue;
1591                 }
1592                 coll = rbd_alloc_coll(num_segs);
1593                 if (!coll) {
1594                         spin_lock_irq(q->queue_lock);
1595                         __blk_end_request_all(rq, -ENOMEM);
1596                         ceph_put_snap_context(snapc);
1597                         continue;
1598                 }
1599
1600                 bio_offset = 0;
1601                 do {
1602                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1603                         unsigned int chain_size;
1604                         struct bio *bio_chain;
1605
1606                         BUG_ON(limit > (u64) UINT_MAX);
1607                         chain_size = (unsigned int) limit;
1608                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1609
1610                         kref_get(&coll->kref);
1611
1612                         /* Pass a cloned bio chain via an osd request */
1613
1614                         bio_chain = bio_chain_clone_range(&bio,
1615                                                 &bio_offset, chain_size,
1616                                                 GFP_ATOMIC);
1617                         if (bio_chain)
1618                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1619                                                 ofs, chain_size,
1620                                                 bio_chain, coll, cur_seg);
1621                         else
1622                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1623                                                        -ENOMEM, chain_size);
1624                         size -= chain_size;
1625                         ofs += chain_size;
1626
1627                         cur_seg++;
1628                 } while (size > 0);
1629                 kref_put(&coll->kref, rbd_coll_release);
1630
1631                 spin_lock_irq(q->queue_lock);
1632
1633                 ceph_put_snap_context(snapc);
1634         }
1635 }
1636
1637 /*
1638  * a queue callback. Makes sure that we don't create a bio that spans across
1639  * multiple osd objects. One exception would be with a single page bios,
1640  * which we handle later at bio_chain_clone_range()
1641  */
1642 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1643                           struct bio_vec *bvec)
1644 {
1645         struct rbd_device *rbd_dev = q->queuedata;
1646         sector_t sector_offset;
1647         sector_t sectors_per_obj;
1648         sector_t obj_sector_offset;
1649         int ret;
1650
1651         /*
1652          * Find how far into its rbd object the partition-relative
1653          * bio start sector is to offset relative to the enclosing
1654          * device.
1655          */
1656         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1657         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1658         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1659
1660         /*
1661          * Compute the number of bytes from that offset to the end
1662          * of the object.  Account for what's already used by the bio.
1663          */
1664         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1665         if (ret > bmd->bi_size)
1666                 ret -= bmd->bi_size;
1667         else
1668                 ret = 0;
1669
1670         /*
1671          * Don't send back more than was asked for.  And if the bio
1672          * was empty, let the whole thing through because:  "Note
1673          * that a block device *must* allow a single page to be
1674          * added to an empty bio."
1675          */
1676         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1677         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1678                 ret = (int) bvec->bv_len;
1679
1680         return ret;
1681 }
1682
1683 static void rbd_free_disk(struct rbd_device *rbd_dev)
1684 {
1685         struct gendisk *disk = rbd_dev->disk;
1686
1687         if (!disk)
1688                 return;
1689
1690         if (disk->flags & GENHD_FL_UP)
1691                 del_gendisk(disk);
1692         if (disk->queue)
1693                 blk_cleanup_queue(disk->queue);
1694         put_disk(disk);
1695 }
1696
1697 /*
1698  * Read the complete header for the given rbd device.
1699  *
1700  * Returns a pointer to a dynamically-allocated buffer containing
1701  * the complete and validated header.  Caller can pass the address
1702  * of a variable that will be filled in with the version of the
1703  * header object at the time it was read.
1704  *
1705  * Returns a pointer-coded errno if a failure occurs.
1706  */
1707 static struct rbd_image_header_ondisk *
1708 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1709 {
1710         struct rbd_image_header_ondisk *ondisk = NULL;
1711         u32 snap_count = 0;
1712         u64 names_size = 0;
1713         u32 want_count;
1714         int ret;
1715
1716         /*
1717          * The complete header will include an array of its 64-bit
1718          * snapshot ids, followed by the names of those snapshots as
1719          * a contiguous block of NUL-terminated strings.  Note that
1720          * the number of snapshots could change by the time we read
1721          * it in, in which case we re-read it.
1722          */
1723         do {
1724                 size_t size;
1725
1726                 kfree(ondisk);
1727
1728                 size = sizeof (*ondisk);
1729                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1730                 size += names_size;
1731                 ondisk = kmalloc(size, GFP_KERNEL);
1732                 if (!ondisk)
1733                         return ERR_PTR(-ENOMEM);
1734
1735                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1736                                        rbd_dev->header_name,
1737                                        0, size,
1738                                        (char *) ondisk, version);
1739
1740                 if (ret < 0)
1741                         goto out_err;
1742                 if (WARN_ON((size_t) ret < size)) {
1743                         ret = -ENXIO;
1744                         pr_warning("short header read for image %s"
1745                                         " (want %zd got %d)\n",
1746                                 rbd_dev->image_name, size, ret);
1747                         goto out_err;
1748                 }
1749                 if (!rbd_dev_ondisk_valid(ondisk)) {
1750                         ret = -ENXIO;
1751                         pr_warning("invalid header for image %s\n",
1752                                 rbd_dev->image_name);
1753                         goto out_err;
1754                 }
1755
1756                 names_size = le64_to_cpu(ondisk->snap_names_len);
1757                 want_count = snap_count;
1758                 snap_count = le32_to_cpu(ondisk->snap_count);
1759         } while (snap_count != want_count);
1760
1761         return ondisk;
1762
1763 out_err:
1764         kfree(ondisk);
1765
1766         return ERR_PTR(ret);
1767 }
1768
1769 /*
1770  * reload the ondisk the header
1771  */
1772 static int rbd_read_header(struct rbd_device *rbd_dev,
1773                            struct rbd_image_header *header)
1774 {
1775         struct rbd_image_header_ondisk *ondisk;
1776         u64 ver = 0;
1777         int ret;
1778
1779         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1780         if (IS_ERR(ondisk))
1781                 return PTR_ERR(ondisk);
1782         ret = rbd_header_from_disk(header, ondisk);
1783         if (ret >= 0)
1784                 header->obj_version = ver;
1785         kfree(ondisk);
1786
1787         return ret;
1788 }
1789
1790 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1791 {
1792         struct rbd_snap *snap;
1793         struct rbd_snap *next;
1794
1795         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1796                 rbd_remove_snap_dev(snap);
1797 }
1798
1799 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1800 {
1801         sector_t size;
1802
1803         if (rbd_dev->snap_id != CEPH_NOSNAP)
1804                 return;
1805
1806         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1807         dout("setting size to %llu sectors", (unsigned long long) size);
1808         rbd_dev->mapping.size = (u64) size;
1809         set_capacity(rbd_dev->disk, size);
1810 }
1811
1812 /*
1813  * only read the first part of the ondisk header, without the snaps info
1814  */
1815 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1816 {
1817         int ret;
1818         struct rbd_image_header h;
1819
1820         ret = rbd_read_header(rbd_dev, &h);
1821         if (ret < 0)
1822                 return ret;
1823
1824         down_write(&rbd_dev->header_rwsem);
1825
1826         /* Update image size, and check for resize of mapped image */
1827         rbd_dev->header.image_size = h.image_size;
1828         rbd_update_mapping_size(rbd_dev);
1829
1830         /* rbd_dev->header.object_prefix shouldn't change */
1831         kfree(rbd_dev->header.snap_sizes);
1832         kfree(rbd_dev->header.snap_names);
1833         /* osd requests may still refer to snapc */
1834         ceph_put_snap_context(rbd_dev->header.snapc);
1835
1836         if (hver)
1837                 *hver = h.obj_version;
1838         rbd_dev->header.obj_version = h.obj_version;
1839         rbd_dev->header.image_size = h.image_size;
1840         rbd_dev->header.snapc = h.snapc;
1841         rbd_dev->header.snap_names = h.snap_names;
1842         rbd_dev->header.snap_sizes = h.snap_sizes;
1843         /* Free the extra copy of the object prefix */
1844         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1845         kfree(h.object_prefix);
1846
1847         ret = rbd_dev_snaps_update(rbd_dev);
1848         if (!ret)
1849                 ret = rbd_dev_snaps_register(rbd_dev);
1850
1851         up_write(&rbd_dev->header_rwsem);
1852
1853         return ret;
1854 }
1855
1856 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1857 {
1858         int ret;
1859
1860         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1861         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1862         if (rbd_dev->image_format == 1)
1863                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1864         else
1865                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1866         mutex_unlock(&ctl_mutex);
1867
1868         return ret;
1869 }
1870
1871 static int rbd_init_disk(struct rbd_device *rbd_dev)
1872 {
1873         struct gendisk *disk;
1874         struct request_queue *q;
1875         u64 segment_size;
1876
1877         /* create gendisk info */
1878         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1879         if (!disk)
1880                 return -ENOMEM;
1881
1882         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1883                  rbd_dev->dev_id);
1884         disk->major = rbd_dev->major;
1885         disk->first_minor = 0;
1886         disk->fops = &rbd_bd_ops;
1887         disk->private_data = rbd_dev;
1888
1889         /* init rq */
1890         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1891         if (!q)
1892                 goto out_disk;
1893
1894         /* We use the default size, but let's be explicit about it. */
1895         blk_queue_physical_block_size(q, SECTOR_SIZE);
1896
1897         /* set io sizes to object size */
1898         segment_size = rbd_obj_bytes(&rbd_dev->header);
1899         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1900         blk_queue_max_segment_size(q, segment_size);
1901         blk_queue_io_min(q, segment_size);
1902         blk_queue_io_opt(q, segment_size);
1903
1904         blk_queue_merge_bvec(q, rbd_merge_bvec);
1905         disk->queue = q;
1906
1907         q->queuedata = rbd_dev;
1908
1909         rbd_dev->disk = disk;
1910
1911         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1912
1913         return 0;
1914 out_disk:
1915         put_disk(disk);
1916
1917         return -ENOMEM;
1918 }
1919
1920 /*
1921   sysfs
1922 */
1923
1924 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1925 {
1926         return container_of(dev, struct rbd_device, dev);
1927 }
1928
1929 static ssize_t rbd_size_show(struct device *dev,
1930                              struct device_attribute *attr, char *buf)
1931 {
1932         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1933         sector_t size;
1934
1935         down_read(&rbd_dev->header_rwsem);
1936         size = get_capacity(rbd_dev->disk);
1937         up_read(&rbd_dev->header_rwsem);
1938
1939         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1940 }
1941
1942 /*
1943  * Note this shows the features for whatever's mapped, which is not
1944  * necessarily the base image.
1945  */
1946 static ssize_t rbd_features_show(struct device *dev,
1947                              struct device_attribute *attr, char *buf)
1948 {
1949         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1950
1951         return sprintf(buf, "0x%016llx\n",
1952                         (unsigned long long) rbd_dev->mapping.features);
1953 }
1954
1955 static ssize_t rbd_major_show(struct device *dev,
1956                               struct device_attribute *attr, char *buf)
1957 {
1958         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1959
1960         return sprintf(buf, "%d\n", rbd_dev->major);
1961 }
1962
1963 static ssize_t rbd_client_id_show(struct device *dev,
1964                                   struct device_attribute *attr, char *buf)
1965 {
1966         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1967
1968         return sprintf(buf, "client%lld\n",
1969                         ceph_client_id(rbd_dev->rbd_client->client));
1970 }
1971
1972 static ssize_t rbd_pool_show(struct device *dev,
1973                              struct device_attribute *attr, char *buf)
1974 {
1975         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976
1977         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1978 }
1979
1980 static ssize_t rbd_pool_id_show(struct device *dev,
1981                              struct device_attribute *attr, char *buf)
1982 {
1983         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1984
1985         return sprintf(buf, "%llu\n", (unsigned long long) rbd_dev->pool_id);
1986 }
1987
1988 static ssize_t rbd_name_show(struct device *dev,
1989                              struct device_attribute *attr, char *buf)
1990 {
1991         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1992
1993         return sprintf(buf, "%s\n", rbd_dev->image_name);
1994 }
1995
1996 static ssize_t rbd_image_id_show(struct device *dev,
1997                              struct device_attribute *attr, char *buf)
1998 {
1999         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2000
2001         return sprintf(buf, "%s\n", rbd_dev->image_id);
2002 }
2003
2004 /*
2005  * Shows the name of the currently-mapped snapshot (or
2006  * RBD_SNAP_HEAD_NAME for the base image).
2007  */
2008 static ssize_t rbd_snap_show(struct device *dev,
2009                              struct device_attribute *attr,
2010                              char *buf)
2011 {
2012         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2013
2014         return sprintf(buf, "%s\n", rbd_dev->snap_name);
2015 }
2016
2017 static ssize_t rbd_image_refresh(struct device *dev,
2018                                  struct device_attribute *attr,
2019                                  const char *buf,
2020                                  size_t size)
2021 {
2022         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2023         int ret;
2024
2025         ret = rbd_dev_refresh(rbd_dev, NULL);
2026
2027         return ret < 0 ? ret : size;
2028 }
2029
2030 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2031 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2032 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2033 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2034 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2035 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2036 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2037 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2038 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2039 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2040
2041 static struct attribute *rbd_attrs[] = {
2042         &dev_attr_size.attr,
2043         &dev_attr_features.attr,
2044         &dev_attr_major.attr,
2045         &dev_attr_client_id.attr,
2046         &dev_attr_pool.attr,
2047         &dev_attr_pool_id.attr,
2048         &dev_attr_name.attr,
2049         &dev_attr_image_id.attr,
2050         &dev_attr_current_snap.attr,
2051         &dev_attr_refresh.attr,
2052         NULL
2053 };
2054
2055 static struct attribute_group rbd_attr_group = {
2056         .attrs = rbd_attrs,
2057 };
2058
2059 static const struct attribute_group *rbd_attr_groups[] = {
2060         &rbd_attr_group,
2061         NULL
2062 };
2063
2064 static void rbd_sysfs_dev_release(struct device *dev)
2065 {
2066 }
2067
2068 static struct device_type rbd_device_type = {
2069         .name           = "rbd",
2070         .groups         = rbd_attr_groups,
2071         .release        = rbd_sysfs_dev_release,
2072 };
2073
2074
2075 /*
2076   sysfs - snapshots
2077 */
2078
2079 static ssize_t rbd_snap_size_show(struct device *dev,
2080                                   struct device_attribute *attr,
2081                                   char *buf)
2082 {
2083         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2084
2085         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2086 }
2087
2088 static ssize_t rbd_snap_id_show(struct device *dev,
2089                                 struct device_attribute *attr,
2090                                 char *buf)
2091 {
2092         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2093
2094         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2095 }
2096
2097 static ssize_t rbd_snap_features_show(struct device *dev,
2098                                 struct device_attribute *attr,
2099                                 char *buf)
2100 {
2101         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2102
2103         return sprintf(buf, "0x%016llx\n",
2104                         (unsigned long long) snap->features);
2105 }
2106
2107 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2108 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2109 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2110
2111 static struct attribute *rbd_snap_attrs[] = {
2112         &dev_attr_snap_size.attr,
2113         &dev_attr_snap_id.attr,
2114         &dev_attr_snap_features.attr,
2115         NULL,
2116 };
2117
2118 static struct attribute_group rbd_snap_attr_group = {
2119         .attrs = rbd_snap_attrs,
2120 };
2121
2122 static void rbd_snap_dev_release(struct device *dev)
2123 {
2124         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2125         kfree(snap->name);
2126         kfree(snap);
2127 }
2128
2129 static const struct attribute_group *rbd_snap_attr_groups[] = {
2130         &rbd_snap_attr_group,
2131         NULL
2132 };
2133
2134 static struct device_type rbd_snap_device_type = {
2135         .groups         = rbd_snap_attr_groups,
2136         .release        = rbd_snap_dev_release,
2137 };
2138
2139 static bool rbd_snap_registered(struct rbd_snap *snap)
2140 {
2141         bool ret = snap->dev.type == &rbd_snap_device_type;
2142         bool reg = device_is_registered(&snap->dev);
2143
2144         rbd_assert(!ret ^ reg);
2145
2146         return ret;
2147 }
2148
2149 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2150 {
2151         list_del(&snap->node);
2152         if (device_is_registered(&snap->dev))
2153                 device_unregister(&snap->dev);
2154 }
2155
2156 static int rbd_register_snap_dev(struct rbd_snap *snap,
2157                                   struct device *parent)
2158 {
2159         struct device *dev = &snap->dev;
2160         int ret;
2161
2162         dev->type = &rbd_snap_device_type;
2163         dev->parent = parent;
2164         dev->release = rbd_snap_dev_release;
2165         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2166         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2167
2168         ret = device_register(dev);
2169
2170         return ret;
2171 }
2172
2173 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2174                                                 const char *snap_name,
2175                                                 u64 snap_id, u64 snap_size,
2176                                                 u64 snap_features)
2177 {
2178         struct rbd_snap *snap;
2179         int ret;
2180
2181         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2182         if (!snap)
2183                 return ERR_PTR(-ENOMEM);
2184
2185         ret = -ENOMEM;
2186         snap->name = kstrdup(snap_name, GFP_KERNEL);
2187         if (!snap->name)
2188                 goto err;
2189
2190         snap->id = snap_id;
2191         snap->size = snap_size;
2192         snap->features = snap_features;
2193
2194         return snap;
2195
2196 err:
2197         kfree(snap->name);
2198         kfree(snap);
2199
2200         return ERR_PTR(ret);
2201 }
2202
2203 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2204                 u64 *snap_size, u64 *snap_features)
2205 {
2206         char *snap_name;
2207
2208         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2209
2210         *snap_size = rbd_dev->header.snap_sizes[which];
2211         *snap_features = 0;     /* No features for v1 */
2212
2213         /* Skip over names until we find the one we are looking for */
2214
2215         snap_name = rbd_dev->header.snap_names;
2216         while (which--)
2217                 snap_name += strlen(snap_name) + 1;
2218
2219         return snap_name;
2220 }
2221
2222 /*
2223  * Get the size and object order for an image snapshot, or if
2224  * snap_id is CEPH_NOSNAP, gets this information for the base
2225  * image.
2226  */
2227 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2228                                 u8 *order, u64 *snap_size)
2229 {
2230         __le64 snapid = cpu_to_le64(snap_id);
2231         int ret;
2232         struct {
2233                 u8 order;
2234                 __le64 size;
2235         } __attribute__ ((packed)) size_buf = { 0 };
2236
2237         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2238                                 "rbd", "get_size",
2239                                 (char *) &snapid, sizeof (snapid),
2240                                 (char *) &size_buf, sizeof (size_buf),
2241                                 CEPH_OSD_FLAG_READ, NULL);
2242         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2243         if (ret < 0)
2244                 return ret;
2245
2246         *order = size_buf.order;
2247         *snap_size = le64_to_cpu(size_buf.size);
2248
2249         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2250                 (unsigned long long) snap_id, (unsigned int) *order,
2251                 (unsigned long long) *snap_size);
2252
2253         return 0;
2254 }
2255
2256 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2257 {
2258         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2259                                         &rbd_dev->header.obj_order,
2260                                         &rbd_dev->header.image_size);
2261 }
2262
2263 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2264 {
2265         void *reply_buf;
2266         int ret;
2267         void *p;
2268
2269         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2270         if (!reply_buf)
2271                 return -ENOMEM;
2272
2273         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2274                                 "rbd", "get_object_prefix",
2275                                 NULL, 0,
2276                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2277                                 CEPH_OSD_FLAG_READ, NULL);
2278         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2279         if (ret < 0)
2280                 goto out;
2281         ret = 0;    /* rbd_req_sync_exec() can return positive */
2282
2283         p = reply_buf;
2284         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2285                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2286                                                 NULL, GFP_NOIO);
2287
2288         if (IS_ERR(rbd_dev->header.object_prefix)) {
2289                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2290                 rbd_dev->header.object_prefix = NULL;
2291         } else {
2292                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2293         }
2294
2295 out:
2296         kfree(reply_buf);
2297
2298         return ret;
2299 }
2300
2301 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2302                 u64 *snap_features)
2303 {
2304         __le64 snapid = cpu_to_le64(snap_id);
2305         struct {
2306                 __le64 features;
2307                 __le64 incompat;
2308         } features_buf = { 0 };
2309         u64 incompat;
2310         int ret;
2311
2312         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2313                                 "rbd", "get_features",
2314                                 (char *) &snapid, sizeof (snapid),
2315                                 (char *) &features_buf, sizeof (features_buf),
2316                                 CEPH_OSD_FLAG_READ, NULL);
2317         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2318         if (ret < 0)
2319                 return ret;
2320
2321         incompat = le64_to_cpu(features_buf.incompat);
2322         if (incompat & ~RBD_FEATURES_ALL)
2323                 return -ENOTSUPP;
2324
2325         *snap_features = le64_to_cpu(features_buf.features);
2326
2327         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2328                 (unsigned long long) snap_id,
2329                 (unsigned long long) *snap_features,
2330                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2331
2332         return 0;
2333 }
2334
2335 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2336 {
2337         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2338                                                 &rbd_dev->header.features);
2339 }
2340
2341 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2342 {
2343         size_t size;
2344         int ret;
2345         void *reply_buf;
2346         void *p;
2347         void *end;
2348         u64 seq;
2349         u32 snap_count;
2350         struct ceph_snap_context *snapc;
2351         u32 i;
2352
2353         /*
2354          * We'll need room for the seq value (maximum snapshot id),
2355          * snapshot count, and array of that many snapshot ids.
2356          * For now we have a fixed upper limit on the number we're
2357          * prepared to receive.
2358          */
2359         size = sizeof (__le64) + sizeof (__le32) +
2360                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2361         reply_buf = kzalloc(size, GFP_KERNEL);
2362         if (!reply_buf)
2363                 return -ENOMEM;
2364
2365         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2366                                 "rbd", "get_snapcontext",
2367                                 NULL, 0,
2368                                 reply_buf, size,
2369                                 CEPH_OSD_FLAG_READ, ver);
2370         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2371         if (ret < 0)
2372                 goto out;
2373
2374         ret = -ERANGE;
2375         p = reply_buf;
2376         end = (char *) reply_buf + size;
2377         ceph_decode_64_safe(&p, end, seq, out);
2378         ceph_decode_32_safe(&p, end, snap_count, out);
2379
2380         /*
2381          * Make sure the reported number of snapshot ids wouldn't go
2382          * beyond the end of our buffer.  But before checking that,
2383          * make sure the computed size of the snapshot context we
2384          * allocate is representable in a size_t.
2385          */
2386         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2387                                  / sizeof (u64)) {
2388                 ret = -EINVAL;
2389                 goto out;
2390         }
2391         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2392                 goto out;
2393
2394         size = sizeof (struct ceph_snap_context) +
2395                                 snap_count * sizeof (snapc->snaps[0]);
2396         snapc = kmalloc(size, GFP_KERNEL);
2397         if (!snapc) {
2398                 ret = -ENOMEM;
2399                 goto out;
2400         }
2401
2402         atomic_set(&snapc->nref, 1);
2403         snapc->seq = seq;
2404         snapc->num_snaps = snap_count;
2405         for (i = 0; i < snap_count; i++)
2406                 snapc->snaps[i] = ceph_decode_64(&p);
2407
2408         rbd_dev->header.snapc = snapc;
2409
2410         dout("  snap context seq = %llu, snap_count = %u\n",
2411                 (unsigned long long) seq, (unsigned int) snap_count);
2412
2413 out:
2414         kfree(reply_buf);
2415
2416         return 0;
2417 }
2418
2419 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2420 {
2421         size_t size;
2422         void *reply_buf;
2423         __le64 snap_id;
2424         int ret;
2425         void *p;
2426         void *end;
2427         size_t snap_name_len;
2428         char *snap_name;
2429
2430         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2431         reply_buf = kmalloc(size, GFP_KERNEL);
2432         if (!reply_buf)
2433                 return ERR_PTR(-ENOMEM);
2434
2435         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2436         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2437                                 "rbd", "get_snapshot_name",
2438                                 (char *) &snap_id, sizeof (snap_id),
2439                                 reply_buf, size,
2440                                 CEPH_OSD_FLAG_READ, NULL);
2441         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2442         if (ret < 0)
2443                 goto out;
2444
2445         p = reply_buf;
2446         end = (char *) reply_buf + size;
2447         snap_name_len = 0;
2448         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2449                                 GFP_KERNEL);
2450         if (IS_ERR(snap_name)) {
2451                 ret = PTR_ERR(snap_name);
2452                 goto out;
2453         } else {
2454                 dout("  snap_id 0x%016llx snap_name = %s\n",
2455                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2456         }
2457         kfree(reply_buf);
2458
2459         return snap_name;
2460 out:
2461         kfree(reply_buf);
2462
2463         return ERR_PTR(ret);
2464 }
2465
2466 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2467                 u64 *snap_size, u64 *snap_features)
2468 {
2469         __le64 snap_id;
2470         u8 order;
2471         int ret;
2472
2473         snap_id = rbd_dev->header.snapc->snaps[which];
2474         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2475         if (ret)
2476                 return ERR_PTR(ret);
2477         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2478         if (ret)
2479                 return ERR_PTR(ret);
2480
2481         return rbd_dev_v2_snap_name(rbd_dev, which);
2482 }
2483
2484 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2485                 u64 *snap_size, u64 *snap_features)
2486 {
2487         if (rbd_dev->image_format == 1)
2488                 return rbd_dev_v1_snap_info(rbd_dev, which,
2489                                         snap_size, snap_features);
2490         if (rbd_dev->image_format == 2)
2491                 return rbd_dev_v2_snap_info(rbd_dev, which,
2492                                         snap_size, snap_features);
2493         return ERR_PTR(-EINVAL);
2494 }
2495
2496 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2497 {
2498         int ret;
2499         __u8 obj_order;
2500
2501         down_write(&rbd_dev->header_rwsem);
2502
2503         /* Grab old order first, to see if it changes */
2504
2505         obj_order = rbd_dev->header.obj_order,
2506         ret = rbd_dev_v2_image_size(rbd_dev);
2507         if (ret)
2508                 goto out;
2509         if (rbd_dev->header.obj_order != obj_order) {
2510                 ret = -EIO;
2511                 goto out;
2512         }
2513         rbd_update_mapping_size(rbd_dev);
2514
2515         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2516         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2517         if (ret)
2518                 goto out;
2519         ret = rbd_dev_snaps_update(rbd_dev);
2520         dout("rbd_dev_snaps_update returned %d\n", ret);
2521         if (ret)
2522                 goto out;
2523         ret = rbd_dev_snaps_register(rbd_dev);
2524         dout("rbd_dev_snaps_register returned %d\n", ret);
2525 out:
2526         up_write(&rbd_dev->header_rwsem);
2527
2528         return ret;
2529 }
2530
2531 /*
2532  * Scan the rbd device's current snapshot list and compare it to the
2533  * newly-received snapshot context.  Remove any existing snapshots
2534  * not present in the new snapshot context.  Add a new snapshot for
2535  * any snaphots in the snapshot context not in the current list.
2536  * And verify there are no changes to snapshots we already know
2537  * about.
2538  *
2539  * Assumes the snapshots in the snapshot context are sorted by
2540  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2541  * are also maintained in that order.)
2542  */
2543 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2544 {
2545         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2546         const u32 snap_count = snapc->num_snaps;
2547         struct list_head *head = &rbd_dev->snaps;
2548         struct list_head *links = head->next;
2549         u32 index = 0;
2550
2551         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2552         while (index < snap_count || links != head) {
2553                 u64 snap_id;
2554                 struct rbd_snap *snap;
2555                 char *snap_name;
2556                 u64 snap_size = 0;
2557                 u64 snap_features = 0;
2558
2559                 snap_id = index < snap_count ? snapc->snaps[index]
2560                                              : CEPH_NOSNAP;
2561                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2562                                      : NULL;
2563                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2564
2565                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2566                         struct list_head *next = links->next;
2567
2568                         /* Existing snapshot not in the new snap context */
2569
2570                         if (rbd_dev->snap_id == snap->id)
2571                                 rbd_dev->exists = false;
2572                         rbd_remove_snap_dev(snap);
2573                         dout("%ssnap id %llu has been removed\n",
2574                                 rbd_dev->snap_id == snap->id ?  "mapped " : "",
2575                                 (unsigned long long) snap->id);
2576
2577                         /* Done with this list entry; advance */
2578
2579                         links = next;
2580                         continue;
2581                 }
2582
2583                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2584                                         &snap_size, &snap_features);
2585                 if (IS_ERR(snap_name))
2586                         return PTR_ERR(snap_name);
2587
2588                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2589                         (unsigned long long) snap_id);
2590                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2591                         struct rbd_snap *new_snap;
2592
2593                         /* We haven't seen this snapshot before */
2594
2595                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2596                                         snap_id, snap_size, snap_features);
2597                         if (IS_ERR(new_snap)) {
2598                                 int err = PTR_ERR(new_snap);
2599
2600                                 dout("  failed to add dev, error %d\n", err);
2601
2602                                 return err;
2603                         }
2604
2605                         /* New goes before existing, or at end of list */
2606
2607                         dout("  added dev%s\n", snap ? "" : " at end\n");
2608                         if (snap)
2609                                 list_add_tail(&new_snap->node, &snap->node);
2610                         else
2611                                 list_add_tail(&new_snap->node, head);
2612                 } else {
2613                         /* Already have this one */
2614
2615                         dout("  already present\n");
2616
2617                         rbd_assert(snap->size == snap_size);
2618                         rbd_assert(!strcmp(snap->name, snap_name));
2619                         rbd_assert(snap->features == snap_features);
2620
2621                         /* Done with this list entry; advance */
2622
2623                         links = links->next;
2624                 }
2625
2626                 /* Advance to the next entry in the snapshot context */
2627
2628                 index++;
2629         }
2630         dout("%s: done\n", __func__);
2631
2632         return 0;
2633 }
2634
2635 /*
2636  * Scan the list of snapshots and register the devices for any that
2637  * have not already been registered.
2638  */
2639 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2640 {
2641         struct rbd_snap *snap;
2642         int ret = 0;
2643
2644         dout("%s called\n", __func__);
2645         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2646                 return -EIO;
2647
2648         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2649                 if (!rbd_snap_registered(snap)) {
2650                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2651                         if (ret < 0)
2652                                 break;
2653                 }
2654         }
2655         dout("%s: returning %d\n", __func__, ret);
2656
2657         return ret;
2658 }
2659
2660 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2661 {
2662         struct device *dev;
2663         int ret;
2664
2665         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2666
2667         dev = &rbd_dev->dev;
2668         dev->bus = &rbd_bus_type;
2669         dev->type = &rbd_device_type;
2670         dev->parent = &rbd_root_dev;
2671         dev->release = rbd_dev_release;
2672         dev_set_name(dev, "%d", rbd_dev->dev_id);
2673         ret = device_register(dev);
2674
2675         mutex_unlock(&ctl_mutex);
2676
2677         return ret;
2678 }
2679
2680 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2681 {
2682         device_unregister(&rbd_dev->dev);
2683 }
2684
2685 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2686 {
2687         int ret, rc;
2688
2689         do {
2690                 ret = rbd_req_sync_watch(rbd_dev);
2691                 if (ret == -ERANGE) {
2692                         rc = rbd_dev_refresh(rbd_dev, NULL);
2693                         if (rc < 0)
2694                                 return rc;
2695                 }
2696         } while (ret == -ERANGE);
2697
2698         return ret;
2699 }
2700
2701 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2702
2703 /*
2704  * Get a unique rbd identifier for the given new rbd_dev, and add
2705  * the rbd_dev to the global list.  The minimum rbd id is 1.
2706  */
2707 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2708 {
2709         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2710
2711         spin_lock(&rbd_dev_list_lock);
2712         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2713         spin_unlock(&rbd_dev_list_lock);
2714         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2715                 (unsigned long long) rbd_dev->dev_id);
2716 }
2717
2718 /*
2719  * Remove an rbd_dev from the global list, and record that its
2720  * identifier is no longer in use.
2721  */
2722 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2723 {
2724         struct list_head *tmp;
2725         int rbd_id = rbd_dev->dev_id;
2726         int max_id;
2727
2728         rbd_assert(rbd_id > 0);
2729
2730         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2731                 (unsigned long long) rbd_dev->dev_id);
2732         spin_lock(&rbd_dev_list_lock);
2733         list_del_init(&rbd_dev->node);
2734
2735         /*
2736          * If the id being "put" is not the current maximum, there
2737          * is nothing special we need to do.
2738          */
2739         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2740                 spin_unlock(&rbd_dev_list_lock);
2741                 return;
2742         }
2743
2744         /*
2745          * We need to update the current maximum id.  Search the
2746          * list to find out what it is.  We're more likely to find
2747          * the maximum at the end, so search the list backward.
2748          */
2749         max_id = 0;
2750         list_for_each_prev(tmp, &rbd_dev_list) {
2751                 struct rbd_device *rbd_dev;
2752
2753                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2754                 if (rbd_dev->dev_id > max_id)
2755                         max_id = rbd_dev->dev_id;
2756         }
2757         spin_unlock(&rbd_dev_list_lock);
2758
2759         /*
2760          * The max id could have been updated by rbd_dev_id_get(), in
2761          * which case it now accurately reflects the new maximum.
2762          * Be careful not to overwrite the maximum value in that
2763          * case.
2764          */
2765         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2766         dout("  max dev id has been reset\n");
2767 }
2768
2769 /*
2770  * Skips over white space at *buf, and updates *buf to point to the
2771  * first found non-space character (if any). Returns the length of
2772  * the token (string of non-white space characters) found.  Note
2773  * that *buf must be terminated with '\0'.
2774  */
2775 static inline size_t next_token(const char **buf)
2776 {
2777         /*
2778         * These are the characters that produce nonzero for
2779         * isspace() in the "C" and "POSIX" locales.
2780         */
2781         const char *spaces = " \f\n\r\t\v";
2782
2783         *buf += strspn(*buf, spaces);   /* Find start of token */
2784
2785         return strcspn(*buf, spaces);   /* Return token length */
2786 }
2787
2788 /*
2789  * Finds the next token in *buf, and if the provided token buffer is
2790  * big enough, copies the found token into it.  The result, if
2791  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2792  * must be terminated with '\0' on entry.
2793  *
2794  * Returns the length of the token found (not including the '\0').
2795  * Return value will be 0 if no token is found, and it will be >=
2796  * token_size if the token would not fit.
2797  *
2798  * The *buf pointer will be updated to point beyond the end of the
2799  * found token.  Note that this occurs even if the token buffer is
2800  * too small to hold it.
2801  */
2802 static inline size_t copy_token(const char **buf,
2803                                 char *token,
2804                                 size_t token_size)
2805 {
2806         size_t len;
2807
2808         len = next_token(buf);
2809         if (len < token_size) {
2810                 memcpy(token, *buf, len);
2811                 *(token + len) = '\0';
2812         }
2813         *buf += len;
2814
2815         return len;
2816 }
2817
2818 /*
2819  * Finds the next token in *buf, dynamically allocates a buffer big
2820  * enough to hold a copy of it, and copies the token into the new
2821  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2822  * that a duplicate buffer is created even for a zero-length token.
2823  *
2824  * Returns a pointer to the newly-allocated duplicate, or a null
2825  * pointer if memory for the duplicate was not available.  If
2826  * the lenp argument is a non-null pointer, the length of the token
2827  * (not including the '\0') is returned in *lenp.
2828  *
2829  * If successful, the *buf pointer will be updated to point beyond
2830  * the end of the found token.
2831  *
2832  * Note: uses GFP_KERNEL for allocation.
2833  */
2834 static inline char *dup_token(const char **buf, size_t *lenp)
2835 {
2836         char *dup;
2837         size_t len;
2838
2839         len = next_token(buf);
2840         dup = kmalloc(len + 1, GFP_KERNEL);
2841         if (!dup)
2842                 return NULL;
2843
2844         memcpy(dup, *buf, len);
2845         *(dup + len) = '\0';
2846         *buf += len;
2847
2848         if (lenp)
2849                 *lenp = len;
2850
2851         return dup;
2852 }
2853
2854 /*
2855  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2856  * rbd_md_name, and name fields of the given rbd_dev, based on the
2857  * list of monitor addresses and other options provided via
2858  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2859  * copy of the snapshot name to map if successful, or a
2860  * pointer-coded error otherwise.
2861  *
2862  * Note: rbd_dev is assumed to have been initially zero-filled.
2863  */
2864 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2865                                 const char *buf,
2866                                 const char **mon_addrs,
2867                                 size_t *mon_addrs_size,
2868                                 char *options,
2869                                 size_t options_size)
2870 {
2871         size_t len;
2872         char *err_ptr = ERR_PTR(-EINVAL);
2873         char *snap_name;
2874
2875         /* The first four tokens are required */
2876
2877         len = next_token(&buf);
2878         if (!len)
2879                 return err_ptr;
2880         *mon_addrs_size = len + 1;
2881         *mon_addrs = buf;
2882
2883         buf += len;
2884
2885         len = copy_token(&buf, options, options_size);
2886         if (!len || len >= options_size)
2887                 return err_ptr;
2888
2889         err_ptr = ERR_PTR(-ENOMEM);
2890         rbd_dev->pool_name = dup_token(&buf, NULL);
2891         if (!rbd_dev->pool_name)
2892                 goto out_err;
2893
2894         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2895         if (!rbd_dev->image_name)
2896                 goto out_err;
2897
2898         /* Snapshot name is optional; default is to use "head" */
2899
2900         len = next_token(&buf);
2901         if (len > RBD_MAX_SNAP_NAME_LEN) {
2902                 err_ptr = ERR_PTR(-ENAMETOOLONG);
2903                 goto out_err;
2904         }
2905         if (!len) {
2906                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2907                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2908         }
2909         snap_name = kmalloc(len + 1, GFP_KERNEL);
2910         if (!snap_name)
2911                 goto out_err;
2912         memcpy(snap_name, buf, len);
2913         *(snap_name + len) = '\0';
2914
2915         return snap_name;
2916
2917 out_err:
2918         kfree(rbd_dev->image_name);
2919         rbd_dev->image_name = NULL;
2920         rbd_dev->image_name_len = 0;
2921         kfree(rbd_dev->pool_name);
2922         rbd_dev->pool_name = NULL;
2923
2924         return err_ptr;
2925 }
2926
2927 /*
2928  * An rbd format 2 image has a unique identifier, distinct from the
2929  * name given to it by the user.  Internally, that identifier is
2930  * what's used to specify the names of objects related to the image.
2931  *
2932  * A special "rbd id" object is used to map an rbd image name to its
2933  * id.  If that object doesn't exist, then there is no v2 rbd image
2934  * with the supplied name.
2935  *
2936  * This function will record the given rbd_dev's image_id field if
2937  * it can be determined, and in that case will return 0.  If any
2938  * errors occur a negative errno will be returned and the rbd_dev's
2939  * image_id field will be unchanged (and should be NULL).
2940  */
2941 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2942 {
2943         int ret;
2944         size_t size;
2945         char *object_name;
2946         void *response;
2947         void *p;
2948
2949         /*
2950          * First, see if the format 2 image id file exists, and if
2951          * so, get the image's persistent id from it.
2952          */
2953         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2954         object_name = kmalloc(size, GFP_NOIO);
2955         if (!object_name)
2956                 return -ENOMEM;
2957         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2958         dout("rbd id object name is %s\n", object_name);
2959
2960         /* Response will be an encoded string, which includes a length */
2961
2962         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2963         response = kzalloc(size, GFP_NOIO);
2964         if (!response) {
2965                 ret = -ENOMEM;
2966                 goto out;
2967         }
2968
2969         ret = rbd_req_sync_exec(rbd_dev, object_name,
2970                                 "rbd", "get_id",
2971                                 NULL, 0,
2972                                 response, RBD_IMAGE_ID_LEN_MAX,
2973                                 CEPH_OSD_FLAG_READ, NULL);
2974         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2975         if (ret < 0)
2976                 goto out;
2977         ret = 0;    /* rbd_req_sync_exec() can return positive */
2978
2979         p = response;
2980         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2981                                                 p + RBD_IMAGE_ID_LEN_MAX,
2982                                                 &rbd_dev->image_id_len,
2983                                                 GFP_NOIO);
2984         if (IS_ERR(rbd_dev->image_id)) {
2985                 ret = PTR_ERR(rbd_dev->image_id);
2986                 rbd_dev->image_id = NULL;
2987         } else {
2988                 dout("image_id is %s\n", rbd_dev->image_id);
2989         }
2990 out:
2991         kfree(response);
2992         kfree(object_name);
2993
2994         return ret;
2995 }
2996
2997 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2998 {
2999         int ret;
3000         size_t size;
3001
3002         /* Version 1 images have no id; empty string is used */
3003
3004         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3005         if (!rbd_dev->image_id)
3006                 return -ENOMEM;
3007         rbd_dev->image_id_len = 0;
3008
3009         /* Record the header object name for this rbd image. */
3010
3011         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3012         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3013         if (!rbd_dev->header_name) {
3014                 ret = -ENOMEM;
3015                 goto out_err;
3016         }
3017         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3018
3019         /* Populate rbd image metadata */
3020
3021         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3022         if (ret < 0)
3023                 goto out_err;
3024         rbd_dev->image_format = 1;
3025
3026         dout("discovered version 1 image, header name is %s\n",
3027                 rbd_dev->header_name);
3028
3029         return 0;
3030
3031 out_err:
3032         kfree(rbd_dev->header_name);
3033         rbd_dev->header_name = NULL;
3034         kfree(rbd_dev->image_id);
3035         rbd_dev->image_id = NULL;
3036
3037         return ret;
3038 }
3039
3040 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3041 {
3042         size_t size;
3043         int ret;
3044         u64 ver = 0;
3045
3046         /*
3047          * Image id was filled in by the caller.  Record the header
3048          * object name for this rbd image.
3049          */
3050         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3051         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3052         if (!rbd_dev->header_name)
3053                 return -ENOMEM;
3054         sprintf(rbd_dev->header_name, "%s%s",
3055                         RBD_HEADER_PREFIX, rbd_dev->image_id);
3056
3057         /* Get the size and object order for the image */
3058
3059         ret = rbd_dev_v2_image_size(rbd_dev);
3060         if (ret < 0)
3061                 goto out_err;
3062
3063         /* Get the object prefix (a.k.a. block_name) for the image */
3064
3065         ret = rbd_dev_v2_object_prefix(rbd_dev);
3066         if (ret < 0)
3067                 goto out_err;
3068
3069         /* Get the and check features for the image */
3070
3071         ret = rbd_dev_v2_features(rbd_dev);
3072         if (ret < 0)
3073                 goto out_err;
3074
3075         /* crypto and compression type aren't (yet) supported for v2 images */
3076
3077         rbd_dev->header.crypt_type = 0;
3078         rbd_dev->header.comp_type = 0;
3079
3080         /* Get the snapshot context, plus the header version */
3081
3082         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3083         if (ret)
3084                 goto out_err;
3085         rbd_dev->header.obj_version = ver;
3086
3087         rbd_dev->image_format = 2;
3088
3089         dout("discovered version 2 image, header name is %s\n",
3090                 rbd_dev->header_name);
3091
3092         return 0;
3093 out_err:
3094         kfree(rbd_dev->header_name);
3095         rbd_dev->header_name = NULL;
3096         kfree(rbd_dev->header.object_prefix);
3097         rbd_dev->header.object_prefix = NULL;
3098
3099         return ret;
3100 }
3101
3102 /*
3103  * Probe for the existence of the header object for the given rbd
3104  * device.  For format 2 images this includes determining the image
3105  * id.
3106  */
3107 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3108 {
3109         int ret;
3110
3111         /*
3112          * Get the id from the image id object.  If it's not a
3113          * format 2 image, we'll get ENOENT back, and we'll assume
3114          * it's a format 1 image.
3115          */
3116         ret = rbd_dev_image_id(rbd_dev);
3117         if (ret)
3118                 ret = rbd_dev_v1_probe(rbd_dev);
3119         else
3120                 ret = rbd_dev_v2_probe(rbd_dev);
3121         if (ret)
3122                 dout("probe failed, returning %d\n", ret);
3123
3124         return ret;
3125 }
3126
3127 static ssize_t rbd_add(struct bus_type *bus,
3128                        const char *buf,
3129                        size_t count)
3130 {
3131         char *options;
3132         struct rbd_device *rbd_dev = NULL;
3133         const char *mon_addrs = NULL;
3134         size_t mon_addrs_size = 0;
3135         struct ceph_osd_client *osdc;
3136         int rc = -ENOMEM;
3137         char *snap_name;
3138
3139         if (!try_module_get(THIS_MODULE))
3140                 return -ENODEV;
3141
3142         options = kmalloc(count, GFP_KERNEL);
3143         if (!options)
3144                 goto err_out_mem;
3145         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3146         if (!rbd_dev)
3147                 goto err_out_mem;
3148
3149         /* static rbd_device initialization */
3150         spin_lock_init(&rbd_dev->lock);
3151         INIT_LIST_HEAD(&rbd_dev->node);
3152         INIT_LIST_HEAD(&rbd_dev->snaps);
3153         init_rwsem(&rbd_dev->header_rwsem);
3154
3155         /* parse add command */
3156         snap_name = rbd_add_parse_args(rbd_dev, buf,
3157                                 &mon_addrs, &mon_addrs_size, options, count);
3158         if (IS_ERR(snap_name)) {
3159                 rc = PTR_ERR(snap_name);
3160                 goto err_out_mem;
3161         }
3162
3163         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3164         if (rc < 0)
3165                 goto err_out_args;
3166
3167         /* pick the pool */
3168         osdc = &rbd_dev->rbd_client->client->osdc;
3169         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3170         if (rc < 0)
3171                 goto err_out_client;
3172         rbd_dev->pool_id = (u64) rc;
3173
3174         rc = rbd_dev_probe(rbd_dev);
3175         if (rc < 0)
3176                 goto err_out_client;
3177
3178         /* no need to lock here, as rbd_dev is not registered yet */
3179         rc = rbd_dev_snaps_update(rbd_dev);
3180         if (rc)
3181                 goto err_out_probe;
3182
3183         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3184         if (rc)
3185                 goto err_out_snaps;
3186
3187         /* generate unique id: find highest unique id, add one */
3188         rbd_dev_id_get(rbd_dev);
3189
3190         /* Fill in the device name, now that we have its id. */
3191         BUILD_BUG_ON(DEV_NAME_LEN
3192                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3193         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3194
3195         /* Get our block major device number. */
3196
3197         rc = register_blkdev(0, rbd_dev->name);
3198         if (rc < 0)
3199                 goto err_out_id;
3200         rbd_dev->major = rc;
3201
3202         /* Set up the blkdev mapping. */
3203
3204         rc = rbd_init_disk(rbd_dev);
3205         if (rc)
3206                 goto err_out_blkdev;
3207
3208         rc = rbd_bus_add_dev(rbd_dev);
3209         if (rc)
3210                 goto err_out_disk;
3211
3212         /*
3213          * At this point cleanup in the event of an error is the job
3214          * of the sysfs code (initiated by rbd_bus_del_dev()).
3215          */
3216
3217         down_write(&rbd_dev->header_rwsem);
3218         rc = rbd_dev_snaps_register(rbd_dev);
3219         up_write(&rbd_dev->header_rwsem);
3220         if (rc)
3221                 goto err_out_bus;
3222
3223         rc = rbd_init_watch_dev(rbd_dev);
3224         if (rc)
3225                 goto err_out_bus;
3226
3227         /* Everything's ready.  Announce the disk to the world. */
3228
3229         add_disk(rbd_dev->disk);
3230
3231         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3232                 (unsigned long long) rbd_dev->mapping.size);
3233
3234         return count;
3235
3236 err_out_bus:
3237         /* this will also clean up rest of rbd_dev stuff */
3238
3239         rbd_bus_del_dev(rbd_dev);
3240         kfree(options);
3241         return rc;
3242
3243 err_out_disk:
3244         rbd_free_disk(rbd_dev);
3245 err_out_blkdev:
3246         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3247 err_out_id:
3248         rbd_dev_id_put(rbd_dev);
3249 err_out_snaps:
3250         rbd_remove_all_snaps(rbd_dev);
3251 err_out_probe:
3252         rbd_header_free(&rbd_dev->header);
3253 err_out_client:
3254         kfree(rbd_dev->header_name);
3255         rbd_put_client(rbd_dev);
3256         kfree(rbd_dev->image_id);
3257 err_out_args:
3258         kfree(rbd_dev->snap_name);
3259         kfree(rbd_dev->image_name);
3260         kfree(rbd_dev->pool_name);
3261 err_out_mem:
3262         kfree(rbd_dev);
3263         kfree(options);
3264
3265         dout("Error adding device %s\n", buf);
3266         module_put(THIS_MODULE);
3267
3268         return (ssize_t) rc;
3269 }
3270
3271 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3272 {
3273         struct list_head *tmp;
3274         struct rbd_device *rbd_dev;
3275
3276         spin_lock(&rbd_dev_list_lock);
3277         list_for_each(tmp, &rbd_dev_list) {
3278                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3279                 if (rbd_dev->dev_id == dev_id) {
3280                         spin_unlock(&rbd_dev_list_lock);
3281                         return rbd_dev;
3282                 }
3283         }
3284         spin_unlock(&rbd_dev_list_lock);
3285         return NULL;
3286 }
3287
3288 static void rbd_dev_release(struct device *dev)
3289 {
3290         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3291
3292         if (rbd_dev->watch_request) {
3293                 struct ceph_client *client = rbd_dev->rbd_client->client;
3294
3295                 ceph_osdc_unregister_linger_request(&client->osdc,
3296                                                     rbd_dev->watch_request);
3297         }
3298         if (rbd_dev->watch_event)
3299                 rbd_req_sync_unwatch(rbd_dev);
3300
3301         rbd_put_client(rbd_dev);
3302
3303         /* clean up and free blkdev */
3304         rbd_free_disk(rbd_dev);
3305         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3306
3307         /* release allocated disk header fields */
3308         rbd_header_free(&rbd_dev->header);
3309
3310         /* done with the id, and with the rbd_dev */
3311         kfree(rbd_dev->snap_name);
3312         kfree(rbd_dev->image_id);
3313         kfree(rbd_dev->header_name);
3314         kfree(rbd_dev->pool_name);
3315         kfree(rbd_dev->image_name);
3316         rbd_dev_id_put(rbd_dev);
3317         kfree(rbd_dev);
3318
3319         /* release module ref */
3320         module_put(THIS_MODULE);
3321 }
3322
3323 static ssize_t rbd_remove(struct bus_type *bus,
3324                           const char *buf,
3325                           size_t count)
3326 {
3327         struct rbd_device *rbd_dev = NULL;
3328         int target_id, rc;
3329         unsigned long ul;
3330         int ret = count;
3331
3332         rc = strict_strtoul(buf, 10, &ul);
3333         if (rc)
3334                 return rc;
3335
3336         /* convert to int; abort if we lost anything in the conversion */
3337         target_id = (int) ul;
3338         if (target_id != ul)
3339                 return -EINVAL;
3340
3341         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3342
3343         rbd_dev = __rbd_get_dev(target_id);
3344         if (!rbd_dev) {
3345                 ret = -ENOENT;
3346                 goto done;
3347         }
3348
3349         rbd_remove_all_snaps(rbd_dev);
3350         rbd_bus_del_dev(rbd_dev);
3351
3352 done:
3353         mutex_unlock(&ctl_mutex);
3354
3355         return ret;
3356 }
3357
3358 /*
3359  * create control files in sysfs
3360  * /sys/bus/rbd/...
3361  */
3362 static int rbd_sysfs_init(void)
3363 {
3364         int ret;
3365
3366         ret = device_register(&rbd_root_dev);
3367         if (ret < 0)
3368                 return ret;
3369
3370         ret = bus_register(&rbd_bus_type);
3371         if (ret < 0)
3372                 device_unregister(&rbd_root_dev);
3373
3374         return ret;
3375 }
3376
3377 static void rbd_sysfs_cleanup(void)
3378 {
3379         bus_unregister(&rbd_bus_type);
3380         device_unregister(&rbd_root_dev);
3381 }
3382
3383 int __init rbd_init(void)
3384 {
3385         int rc;
3386
3387         rc = rbd_sysfs_init();
3388         if (rc)
3389                 return rc;
3390         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3391         return 0;
3392 }
3393
3394 void __exit rbd_exit(void)
3395 {
3396         rbd_sysfs_cleanup();
3397 }
3398
3399 module_init(rbd_init);
3400 module_exit(rbd_exit);
3401
3402 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3403 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3404 MODULE_DESCRIPTION("rados block device");
3405
3406 /* following authorship retained from original osdblk.c */
3407 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3408
3409 MODULE_LICENSE("GPL");