]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
ceph: use ceph_create_snap_context()
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /*
676  * Drop reference to ceph client node. If it's not referenced anymore, release
677  * it.
678  */
679 static void rbd_put_client(struct rbd_client *rbdc)
680 {
681         if (rbdc)
682                 kref_put(&rbdc->kref, rbd_client_release);
683 }
684
685 static bool rbd_image_format_valid(u32 image_format)
686 {
687         return image_format == 1 || image_format == 2;
688 }
689
690 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691 {
692         size_t size;
693         u32 snap_count;
694
695         /* The header has to start with the magic rbd header text */
696         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697                 return false;
698
699         /* The bio layer requires at least sector-sized I/O */
700
701         if (ondisk->options.order < SECTOR_SHIFT)
702                 return false;
703
704         /* If we use u64 in a few spots we may be able to loosen this */
705
706         if (ondisk->options.order > 8 * sizeof (int) - 1)
707                 return false;
708
709         /*
710          * The size of a snapshot header has to fit in a size_t, and
711          * that limits the number of snapshots.
712          */
713         snap_count = le32_to_cpu(ondisk->snap_count);
714         size = SIZE_MAX - sizeof (struct ceph_snap_context);
715         if (snap_count > size / sizeof (__le64))
716                 return false;
717
718         /*
719          * Not only that, but the size of the entire the snapshot
720          * header must also be representable in a size_t.
721          */
722         size -= snap_count * sizeof (__le64);
723         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724                 return false;
725
726         return true;
727 }
728
729 /*
730  * Create a new header structure, translate header format from the on-disk
731  * header.
732  */
733 static int rbd_header_from_disk(struct rbd_image_header *header,
734                                  struct rbd_image_header_ondisk *ondisk)
735 {
736         u32 snap_count;
737         size_t len;
738         size_t size;
739         u32 i;
740
741         memset(header, 0, sizeof (*header));
742
743         snap_count = le32_to_cpu(ondisk->snap_count);
744
745         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
746         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
747         if (!header->object_prefix)
748                 return -ENOMEM;
749         memcpy(header->object_prefix, ondisk->object_prefix, len);
750         header->object_prefix[len] = '\0';
751
752         if (snap_count) {
753                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
754
755                 /* Save a copy of the snapshot names */
756
757                 if (snap_names_len > (u64) SIZE_MAX)
758                         return -EIO;
759                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
760                 if (!header->snap_names)
761                         goto out_err;
762                 /*
763                  * Note that rbd_dev_v1_header_read() guarantees
764                  * the ondisk buffer we're working with has
765                  * snap_names_len bytes beyond the end of the
766                  * snapshot id array, this memcpy() is safe.
767                  */
768                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
769                         snap_names_len);
770
771                 /* Record each snapshot's size */
772
773                 size = snap_count * sizeof (*header->snap_sizes);
774                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
775                 if (!header->snap_sizes)
776                         goto out_err;
777                 for (i = 0; i < snap_count; i++)
778                         header->snap_sizes[i] =
779                                 le64_to_cpu(ondisk->snaps[i].image_size);
780         } else {
781                 header->snap_names = NULL;
782                 header->snap_sizes = NULL;
783         }
784
785         header->features = 0;   /* No features support in v1 images */
786         header->obj_order = ondisk->options.order;
787         header->crypt_type = ondisk->options.crypt_type;
788         header->comp_type = ondisk->options.comp_type;
789
790         /* Allocate and fill in the snapshot context */
791
792         header->image_size = le64_to_cpu(ondisk->image_size);
793
794         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
795         if (!header->snapc)
796                 goto out_err;
797         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
798         for (i = 0; i < snap_count; i++)
799                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
800
801         return 0;
802
803 out_err:
804         kfree(header->snap_sizes);
805         header->snap_sizes = NULL;
806         kfree(header->snap_names);
807         header->snap_names = NULL;
808         kfree(header->object_prefix);
809         header->object_prefix = NULL;
810
811         return -ENOMEM;
812 }
813
814 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
815 {
816         struct rbd_snap *snap;
817
818         if (snap_id == CEPH_NOSNAP)
819                 return RBD_SNAP_HEAD_NAME;
820
821         list_for_each_entry(snap, &rbd_dev->snaps, node)
822                 if (snap_id == snap->id)
823                         return snap->name;
824
825         return NULL;
826 }
827
828 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
829                                         const char *snap_name)
830 {
831         struct rbd_snap *snap;
832
833         list_for_each_entry(snap, &rbd_dev->snaps, node)
834                 if (!strcmp(snap_name, snap->name))
835                         return snap;
836
837         return NULL;
838 }
839
840 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
841 {
842         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
843                     sizeof (RBD_SNAP_HEAD_NAME))) {
844                 rbd_dev->mapping.size = rbd_dev->header.image_size;
845                 rbd_dev->mapping.features = rbd_dev->header.features;
846         } else {
847                 struct rbd_snap *snap;
848
849                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
850                 if (!snap)
851                         return -ENOENT;
852                 rbd_dev->mapping.size = snap->size;
853                 rbd_dev->mapping.features = snap->features;
854                 rbd_dev->mapping.read_only = true;
855         }
856
857         return 0;
858 }
859
860 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
861 {
862         rbd_dev->mapping.size = 0;
863         rbd_dev->mapping.features = 0;
864         rbd_dev->mapping.read_only = true;
865 }
866
867 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
868 {
869         rbd_dev->mapping.size = 0;
870         rbd_dev->mapping.features = 0;
871         rbd_dev->mapping.read_only = true;
872 }
873
874 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
875 {
876         char *name;
877         u64 segment;
878         int ret;
879
880         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
881         if (!name)
882                 return NULL;
883         segment = offset >> rbd_dev->header.obj_order;
884         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
885                         rbd_dev->header.object_prefix, segment);
886         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
887                 pr_err("error formatting segment name for #%llu (%d)\n",
888                         segment, ret);
889                 kfree(name);
890                 name = NULL;
891         }
892
893         return name;
894 }
895
896 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
897 {
898         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
899
900         return offset & (segment_size - 1);
901 }
902
903 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
904                                 u64 offset, u64 length)
905 {
906         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
907
908         offset &= segment_size - 1;
909
910         rbd_assert(length <= U64_MAX - offset);
911         if (offset + length > segment_size)
912                 length = segment_size - offset;
913
914         return length;
915 }
916
917 /*
918  * returns the size of an object in the image
919  */
920 static u64 rbd_obj_bytes(struct rbd_image_header *header)
921 {
922         return 1 << header->obj_order;
923 }
924
925 /*
926  * bio helpers
927  */
928
929 static void bio_chain_put(struct bio *chain)
930 {
931         struct bio *tmp;
932
933         while (chain) {
934                 tmp = chain;
935                 chain = chain->bi_next;
936                 bio_put(tmp);
937         }
938 }
939
940 /*
941  * zeros a bio chain, starting at specific offset
942  */
943 static void zero_bio_chain(struct bio *chain, int start_ofs)
944 {
945         struct bio_vec *bv;
946         unsigned long flags;
947         void *buf;
948         int i;
949         int pos = 0;
950
951         while (chain) {
952                 bio_for_each_segment(bv, chain, i) {
953                         if (pos + bv->bv_len > start_ofs) {
954                                 int remainder = max(start_ofs - pos, 0);
955                                 buf = bvec_kmap_irq(bv, &flags);
956                                 memset(buf + remainder, 0,
957                                        bv->bv_len - remainder);
958                                 bvec_kunmap_irq(buf, &flags);
959                         }
960                         pos += bv->bv_len;
961                 }
962
963                 chain = chain->bi_next;
964         }
965 }
966
967 /*
968  * similar to zero_bio_chain(), zeros data defined by a page array,
969  * starting at the given byte offset from the start of the array and
970  * continuing up to the given end offset.  The pages array is
971  * assumed to be big enough to hold all bytes up to the end.
972  */
973 static void zero_pages(struct page **pages, u64 offset, u64 end)
974 {
975         struct page **page = &pages[offset >> PAGE_SHIFT];
976
977         rbd_assert(end > offset);
978         rbd_assert(end - offset <= (u64)SIZE_MAX);
979         while (offset < end) {
980                 size_t page_offset;
981                 size_t length;
982                 unsigned long flags;
983                 void *kaddr;
984
985                 page_offset = (size_t)(offset & ~PAGE_MASK);
986                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
987                 local_irq_save(flags);
988                 kaddr = kmap_atomic(*page);
989                 memset(kaddr + page_offset, 0, length);
990                 kunmap_atomic(kaddr);
991                 local_irq_restore(flags);
992
993                 offset += length;
994                 page++;
995         }
996 }
997
998 /*
999  * Clone a portion of a bio, starting at the given byte offset
1000  * and continuing for the number of bytes indicated.
1001  */
1002 static struct bio *bio_clone_range(struct bio *bio_src,
1003                                         unsigned int offset,
1004                                         unsigned int len,
1005                                         gfp_t gfpmask)
1006 {
1007         struct bio_vec *bv;
1008         unsigned int resid;
1009         unsigned short idx;
1010         unsigned int voff;
1011         unsigned short end_idx;
1012         unsigned short vcnt;
1013         struct bio *bio;
1014
1015         /* Handle the easy case for the caller */
1016
1017         if (!offset && len == bio_src->bi_size)
1018                 return bio_clone(bio_src, gfpmask);
1019
1020         if (WARN_ON_ONCE(!len))
1021                 return NULL;
1022         if (WARN_ON_ONCE(len > bio_src->bi_size))
1023                 return NULL;
1024         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1025                 return NULL;
1026
1027         /* Find first affected segment... */
1028
1029         resid = offset;
1030         __bio_for_each_segment(bv, bio_src, idx, 0) {
1031                 if (resid < bv->bv_len)
1032                         break;
1033                 resid -= bv->bv_len;
1034         }
1035         voff = resid;
1036
1037         /* ...and the last affected segment */
1038
1039         resid += len;
1040         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1041                 if (resid <= bv->bv_len)
1042                         break;
1043                 resid -= bv->bv_len;
1044         }
1045         vcnt = end_idx - idx + 1;
1046
1047         /* Build the clone */
1048
1049         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1050         if (!bio)
1051                 return NULL;    /* ENOMEM */
1052
1053         bio->bi_bdev = bio_src->bi_bdev;
1054         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1055         bio->bi_rw = bio_src->bi_rw;
1056         bio->bi_flags |= 1 << BIO_CLONED;
1057
1058         /*
1059          * Copy over our part of the bio_vec, then update the first
1060          * and last (or only) entries.
1061          */
1062         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1063                         vcnt * sizeof (struct bio_vec));
1064         bio->bi_io_vec[0].bv_offset += voff;
1065         if (vcnt > 1) {
1066                 bio->bi_io_vec[0].bv_len -= voff;
1067                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1068         } else {
1069                 bio->bi_io_vec[0].bv_len = len;
1070         }
1071
1072         bio->bi_vcnt = vcnt;
1073         bio->bi_size = len;
1074         bio->bi_idx = 0;
1075
1076         return bio;
1077 }
1078
1079 /*
1080  * Clone a portion of a bio chain, starting at the given byte offset
1081  * into the first bio in the source chain and continuing for the
1082  * number of bytes indicated.  The result is another bio chain of
1083  * exactly the given length, or a null pointer on error.
1084  *
1085  * The bio_src and offset parameters are both in-out.  On entry they
1086  * refer to the first source bio and the offset into that bio where
1087  * the start of data to be cloned is located.
1088  *
1089  * On return, bio_src is updated to refer to the bio in the source
1090  * chain that contains first un-cloned byte, and *offset will
1091  * contain the offset of that byte within that bio.
1092  */
1093 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1094                                         unsigned int *offset,
1095                                         unsigned int len,
1096                                         gfp_t gfpmask)
1097 {
1098         struct bio *bi = *bio_src;
1099         unsigned int off = *offset;
1100         struct bio *chain = NULL;
1101         struct bio **end;
1102
1103         /* Build up a chain of clone bios up to the limit */
1104
1105         if (!bi || off >= bi->bi_size || !len)
1106                 return NULL;            /* Nothing to clone */
1107
1108         end = &chain;
1109         while (len) {
1110                 unsigned int bi_size;
1111                 struct bio *bio;
1112
1113                 if (!bi) {
1114                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1115                         goto out_err;   /* EINVAL; ran out of bio's */
1116                 }
1117                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1118                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1119                 if (!bio)
1120                         goto out_err;   /* ENOMEM */
1121
1122                 *end = bio;
1123                 end = &bio->bi_next;
1124
1125                 off += bi_size;
1126                 if (off == bi->bi_size) {
1127                         bi = bi->bi_next;
1128                         off = 0;
1129                 }
1130                 len -= bi_size;
1131         }
1132         *bio_src = bi;
1133         *offset = off;
1134
1135         return chain;
1136 out_err:
1137         bio_chain_put(chain);
1138
1139         return NULL;
1140 }
1141
1142 /*
1143  * The default/initial value for all object request flags is 0.  For
1144  * each flag, once its value is set to 1 it is never reset to 0
1145  * again.
1146  */
1147 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1148 {
1149         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1150                 struct rbd_device *rbd_dev;
1151
1152                 rbd_dev = obj_request->img_request->rbd_dev;
1153                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1154                         obj_request);
1155         }
1156 }
1157
1158 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1159 {
1160         smp_mb();
1161         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1162 }
1163
1164 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1165 {
1166         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1167                 struct rbd_device *rbd_dev = NULL;
1168
1169                 if (obj_request_img_data_test(obj_request))
1170                         rbd_dev = obj_request->img_request->rbd_dev;
1171                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1172                         obj_request);
1173         }
1174 }
1175
1176 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1177 {
1178         smp_mb();
1179         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1180 }
1181
1182 /*
1183  * This sets the KNOWN flag after (possibly) setting the EXISTS
1184  * flag.  The latter is set based on the "exists" value provided.
1185  *
1186  * Note that for our purposes once an object exists it never goes
1187  * away again.  It's possible that the response from two existence
1188  * checks are separated by the creation of the target object, and
1189  * the first ("doesn't exist") response arrives *after* the second
1190  * ("does exist").  In that case we ignore the second one.
1191  */
1192 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1193                                 bool exists)
1194 {
1195         if (exists)
1196                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1197         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1198         smp_mb();
1199 }
1200
1201 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1202 {
1203         smp_mb();
1204         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1205 }
1206
1207 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1208 {
1209         smp_mb();
1210         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1211 }
1212
1213 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1214 {
1215         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1216                 atomic_read(&obj_request->kref.refcount));
1217         kref_get(&obj_request->kref);
1218 }
1219
1220 static void rbd_obj_request_destroy(struct kref *kref);
1221 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1222 {
1223         rbd_assert(obj_request != NULL);
1224         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1225                 atomic_read(&obj_request->kref.refcount));
1226         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1227 }
1228
1229 static void rbd_img_request_get(struct rbd_img_request *img_request)
1230 {
1231         dout("%s: img %p (was %d)\n", __func__, img_request,
1232                 atomic_read(&img_request->kref.refcount));
1233         kref_get(&img_request->kref);
1234 }
1235
1236 static void rbd_img_request_destroy(struct kref *kref);
1237 static void rbd_img_request_put(struct rbd_img_request *img_request)
1238 {
1239         rbd_assert(img_request != NULL);
1240         dout("%s: img %p (was %d)\n", __func__, img_request,
1241                 atomic_read(&img_request->kref.refcount));
1242         kref_put(&img_request->kref, rbd_img_request_destroy);
1243 }
1244
1245 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1246                                         struct rbd_obj_request *obj_request)
1247 {
1248         rbd_assert(obj_request->img_request == NULL);
1249
1250         /* Image request now owns object's original reference */
1251         obj_request->img_request = img_request;
1252         obj_request->which = img_request->obj_request_count;
1253         rbd_assert(!obj_request_img_data_test(obj_request));
1254         obj_request_img_data_set(obj_request);
1255         rbd_assert(obj_request->which != BAD_WHICH);
1256         img_request->obj_request_count++;
1257         list_add_tail(&obj_request->links, &img_request->obj_requests);
1258         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1259                 obj_request->which);
1260 }
1261
1262 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1263                                         struct rbd_obj_request *obj_request)
1264 {
1265         rbd_assert(obj_request->which != BAD_WHICH);
1266
1267         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1268                 obj_request->which);
1269         list_del(&obj_request->links);
1270         rbd_assert(img_request->obj_request_count > 0);
1271         img_request->obj_request_count--;
1272         rbd_assert(obj_request->which == img_request->obj_request_count);
1273         obj_request->which = BAD_WHICH;
1274         rbd_assert(obj_request_img_data_test(obj_request));
1275         rbd_assert(obj_request->img_request == img_request);
1276         obj_request->img_request = NULL;
1277         obj_request->callback = NULL;
1278         rbd_obj_request_put(obj_request);
1279 }
1280
1281 static bool obj_request_type_valid(enum obj_request_type type)
1282 {
1283         switch (type) {
1284         case OBJ_REQUEST_NODATA:
1285         case OBJ_REQUEST_BIO:
1286         case OBJ_REQUEST_PAGES:
1287                 return true;
1288         default:
1289                 return false;
1290         }
1291 }
1292
1293 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1294                                 struct rbd_obj_request *obj_request)
1295 {
1296         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1297
1298         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1299 }
1300
1301 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1302 {
1303
1304         dout("%s: img %p\n", __func__, img_request);
1305
1306         /*
1307          * If no error occurred, compute the aggregate transfer
1308          * count for the image request.  We could instead use
1309          * atomic64_cmpxchg() to update it as each object request
1310          * completes; not clear which way is better off hand.
1311          */
1312         if (!img_request->result) {
1313                 struct rbd_obj_request *obj_request;
1314                 u64 xferred = 0;
1315
1316                 for_each_obj_request(img_request, obj_request)
1317                         xferred += obj_request->xferred;
1318                 img_request->xferred = xferred;
1319         }
1320
1321         if (img_request->callback)
1322                 img_request->callback(img_request);
1323         else
1324                 rbd_img_request_put(img_request);
1325 }
1326
1327 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1328
1329 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1330 {
1331         dout("%s: obj %p\n", __func__, obj_request);
1332
1333         return wait_for_completion_interruptible(&obj_request->completion);
1334 }
1335
1336 /*
1337  * The default/initial value for all image request flags is 0.  Each
1338  * is conditionally set to 1 at image request initialization time
1339  * and currently never change thereafter.
1340  */
1341 static void img_request_write_set(struct rbd_img_request *img_request)
1342 {
1343         set_bit(IMG_REQ_WRITE, &img_request->flags);
1344         smp_mb();
1345 }
1346
1347 static bool img_request_write_test(struct rbd_img_request *img_request)
1348 {
1349         smp_mb();
1350         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1351 }
1352
1353 static void img_request_child_set(struct rbd_img_request *img_request)
1354 {
1355         set_bit(IMG_REQ_CHILD, &img_request->flags);
1356         smp_mb();
1357 }
1358
1359 static bool img_request_child_test(struct rbd_img_request *img_request)
1360 {
1361         smp_mb();
1362         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1363 }
1364
1365 static void img_request_layered_set(struct rbd_img_request *img_request)
1366 {
1367         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1368         smp_mb();
1369 }
1370
1371 static bool img_request_layered_test(struct rbd_img_request *img_request)
1372 {
1373         smp_mb();
1374         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1375 }
1376
1377 static void
1378 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1379 {
1380         u64 xferred = obj_request->xferred;
1381         u64 length = obj_request->length;
1382
1383         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1384                 obj_request, obj_request->img_request, obj_request->result,
1385                 xferred, length);
1386         /*
1387          * ENOENT means a hole in the image.  We zero-fill the
1388          * entire length of the request.  A short read also implies
1389          * zero-fill to the end of the request.  Either way we
1390          * update the xferred count to indicate the whole request
1391          * was satisfied.
1392          */
1393         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1394         if (obj_request->result == -ENOENT) {
1395                 if (obj_request->type == OBJ_REQUEST_BIO)
1396                         zero_bio_chain(obj_request->bio_list, 0);
1397                 else
1398                         zero_pages(obj_request->pages, 0, length);
1399                 obj_request->result = 0;
1400                 obj_request->xferred = length;
1401         } else if (xferred < length && !obj_request->result) {
1402                 if (obj_request->type == OBJ_REQUEST_BIO)
1403                         zero_bio_chain(obj_request->bio_list, xferred);
1404                 else
1405                         zero_pages(obj_request->pages, xferred, length);
1406                 obj_request->xferred = length;
1407         }
1408         obj_request_done_set(obj_request);
1409 }
1410
1411 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1412 {
1413         dout("%s: obj %p cb %p\n", __func__, obj_request,
1414                 obj_request->callback);
1415         if (obj_request->callback)
1416                 obj_request->callback(obj_request);
1417         else
1418                 complete_all(&obj_request->completion);
1419 }
1420
1421 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1422 {
1423         dout("%s: obj %p\n", __func__, obj_request);
1424         obj_request_done_set(obj_request);
1425 }
1426
1427 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1428 {
1429         struct rbd_img_request *img_request = NULL;
1430         struct rbd_device *rbd_dev = NULL;
1431         bool layered = false;
1432
1433         if (obj_request_img_data_test(obj_request)) {
1434                 img_request = obj_request->img_request;
1435                 layered = img_request && img_request_layered_test(img_request);
1436                 rbd_dev = img_request->rbd_dev;
1437         }
1438
1439         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1440                 obj_request, img_request, obj_request->result,
1441                 obj_request->xferred, obj_request->length);
1442         if (layered && obj_request->result == -ENOENT &&
1443                         obj_request->img_offset < rbd_dev->parent_overlap)
1444                 rbd_img_parent_read(obj_request);
1445         else if (img_request)
1446                 rbd_img_obj_request_read_callback(obj_request);
1447         else
1448                 obj_request_done_set(obj_request);
1449 }
1450
1451 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1452 {
1453         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1454                 obj_request->result, obj_request->length);
1455         /*
1456          * There is no such thing as a successful short write.  Set
1457          * it to our originally-requested length.
1458          */
1459         obj_request->xferred = obj_request->length;
1460         obj_request_done_set(obj_request);
1461 }
1462
1463 /*
1464  * For a simple stat call there's nothing to do.  We'll do more if
1465  * this is part of a write sequence for a layered image.
1466  */
1467 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1468 {
1469         dout("%s: obj %p\n", __func__, obj_request);
1470         obj_request_done_set(obj_request);
1471 }
1472
1473 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1474                                 struct ceph_msg *msg)
1475 {
1476         struct rbd_obj_request *obj_request = osd_req->r_priv;
1477         u16 opcode;
1478
1479         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1480         rbd_assert(osd_req == obj_request->osd_req);
1481         if (obj_request_img_data_test(obj_request)) {
1482                 rbd_assert(obj_request->img_request);
1483                 rbd_assert(obj_request->which != BAD_WHICH);
1484         } else {
1485                 rbd_assert(obj_request->which == BAD_WHICH);
1486         }
1487
1488         if (osd_req->r_result < 0)
1489                 obj_request->result = osd_req->r_result;
1490         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1491
1492         BUG_ON(osd_req->r_num_ops > 2);
1493
1494         /*
1495          * We support a 64-bit length, but ultimately it has to be
1496          * passed to blk_end_request(), which takes an unsigned int.
1497          */
1498         obj_request->xferred = osd_req->r_reply_op_len[0];
1499         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1500         opcode = osd_req->r_ops[0].op;
1501         switch (opcode) {
1502         case CEPH_OSD_OP_READ:
1503                 rbd_osd_read_callback(obj_request);
1504                 break;
1505         case CEPH_OSD_OP_WRITE:
1506                 rbd_osd_write_callback(obj_request);
1507                 break;
1508         case CEPH_OSD_OP_STAT:
1509                 rbd_osd_stat_callback(obj_request);
1510                 break;
1511         case CEPH_OSD_OP_CALL:
1512         case CEPH_OSD_OP_NOTIFY_ACK:
1513         case CEPH_OSD_OP_WATCH:
1514                 rbd_osd_trivial_callback(obj_request);
1515                 break;
1516         default:
1517                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1518                         obj_request->object_name, (unsigned short) opcode);
1519                 break;
1520         }
1521
1522         if (obj_request_done_test(obj_request))
1523                 rbd_obj_request_complete(obj_request);
1524 }
1525
1526 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1527 {
1528         struct rbd_img_request *img_request = obj_request->img_request;
1529         struct ceph_osd_request *osd_req = obj_request->osd_req;
1530         u64 snap_id;
1531
1532         rbd_assert(osd_req != NULL);
1533
1534         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1535         ceph_osdc_build_request(osd_req, obj_request->offset,
1536                         NULL, snap_id, NULL);
1537 }
1538
1539 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1540 {
1541         struct rbd_img_request *img_request = obj_request->img_request;
1542         struct ceph_osd_request *osd_req = obj_request->osd_req;
1543         struct ceph_snap_context *snapc;
1544         struct timespec mtime = CURRENT_TIME;
1545
1546         rbd_assert(osd_req != NULL);
1547
1548         snapc = img_request ? img_request->snapc : NULL;
1549         ceph_osdc_build_request(osd_req, obj_request->offset,
1550                         snapc, CEPH_NOSNAP, &mtime);
1551 }
1552
1553 static struct ceph_osd_request *rbd_osd_req_create(
1554                                         struct rbd_device *rbd_dev,
1555                                         bool write_request,
1556                                         struct rbd_obj_request *obj_request)
1557 {
1558         struct ceph_snap_context *snapc = NULL;
1559         struct ceph_osd_client *osdc;
1560         struct ceph_osd_request *osd_req;
1561
1562         if (obj_request_img_data_test(obj_request)) {
1563                 struct rbd_img_request *img_request = obj_request->img_request;
1564
1565                 rbd_assert(write_request ==
1566                                 img_request_write_test(img_request));
1567                 if (write_request)
1568                         snapc = img_request->snapc;
1569         }
1570
1571         /* Allocate and initialize the request, for the single op */
1572
1573         osdc = &rbd_dev->rbd_client->client->osdc;
1574         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1575         if (!osd_req)
1576                 return NULL;    /* ENOMEM */
1577
1578         if (write_request)
1579                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1580         else
1581                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1582
1583         osd_req->r_callback = rbd_osd_req_callback;
1584         osd_req->r_priv = obj_request;
1585
1586         osd_req->r_oid_len = strlen(obj_request->object_name);
1587         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1588         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1589
1590         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1591
1592         return osd_req;
1593 }
1594
1595 /*
1596  * Create a copyup osd request based on the information in the
1597  * object request supplied.  A copyup request has two osd ops,
1598  * a copyup method call, and a "normal" write request.
1599  */
1600 static struct ceph_osd_request *
1601 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1602 {
1603         struct rbd_img_request *img_request;
1604         struct ceph_snap_context *snapc;
1605         struct rbd_device *rbd_dev;
1606         struct ceph_osd_client *osdc;
1607         struct ceph_osd_request *osd_req;
1608
1609         rbd_assert(obj_request_img_data_test(obj_request));
1610         img_request = obj_request->img_request;
1611         rbd_assert(img_request);
1612         rbd_assert(img_request_write_test(img_request));
1613
1614         /* Allocate and initialize the request, for the two ops */
1615
1616         snapc = img_request->snapc;
1617         rbd_dev = img_request->rbd_dev;
1618         osdc = &rbd_dev->rbd_client->client->osdc;
1619         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1620         if (!osd_req)
1621                 return NULL;    /* ENOMEM */
1622
1623         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1624         osd_req->r_callback = rbd_osd_req_callback;
1625         osd_req->r_priv = obj_request;
1626
1627         osd_req->r_oid_len = strlen(obj_request->object_name);
1628         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1629         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1630
1631         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1632
1633         return osd_req;
1634 }
1635
1636
1637 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1638 {
1639         ceph_osdc_put_request(osd_req);
1640 }
1641
1642 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1643
1644 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1645                                                 u64 offset, u64 length,
1646                                                 enum obj_request_type type)
1647 {
1648         struct rbd_obj_request *obj_request;
1649         size_t size;
1650         char *name;
1651
1652         rbd_assert(obj_request_type_valid(type));
1653
1654         size = strlen(object_name) + 1;
1655         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1656         if (!obj_request)
1657                 return NULL;
1658
1659         name = (char *)(obj_request + 1);
1660         obj_request->object_name = memcpy(name, object_name, size);
1661         obj_request->offset = offset;
1662         obj_request->length = length;
1663         obj_request->flags = 0;
1664         obj_request->which = BAD_WHICH;
1665         obj_request->type = type;
1666         INIT_LIST_HEAD(&obj_request->links);
1667         init_completion(&obj_request->completion);
1668         kref_init(&obj_request->kref);
1669
1670         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1671                 offset, length, (int)type, obj_request);
1672
1673         return obj_request;
1674 }
1675
1676 static void rbd_obj_request_destroy(struct kref *kref)
1677 {
1678         struct rbd_obj_request *obj_request;
1679
1680         obj_request = container_of(kref, struct rbd_obj_request, kref);
1681
1682         dout("%s: obj %p\n", __func__, obj_request);
1683
1684         rbd_assert(obj_request->img_request == NULL);
1685         rbd_assert(obj_request->which == BAD_WHICH);
1686
1687         if (obj_request->osd_req)
1688                 rbd_osd_req_destroy(obj_request->osd_req);
1689
1690         rbd_assert(obj_request_type_valid(obj_request->type));
1691         switch (obj_request->type) {
1692         case OBJ_REQUEST_NODATA:
1693                 break;          /* Nothing to do */
1694         case OBJ_REQUEST_BIO:
1695                 if (obj_request->bio_list)
1696                         bio_chain_put(obj_request->bio_list);
1697                 break;
1698         case OBJ_REQUEST_PAGES:
1699                 if (obj_request->pages)
1700                         ceph_release_page_vector(obj_request->pages,
1701                                                 obj_request->page_count);
1702                 break;
1703         }
1704
1705         kfree(obj_request);
1706 }
1707
1708 /*
1709  * Caller is responsible for filling in the list of object requests
1710  * that comprises the image request, and the Linux request pointer
1711  * (if there is one).
1712  */
1713 static struct rbd_img_request *rbd_img_request_create(
1714                                         struct rbd_device *rbd_dev,
1715                                         u64 offset, u64 length,
1716                                         bool write_request,
1717                                         bool child_request)
1718 {
1719         struct rbd_img_request *img_request;
1720
1721         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1722         if (!img_request)
1723                 return NULL;
1724
1725         if (write_request) {
1726                 down_read(&rbd_dev->header_rwsem);
1727                 ceph_get_snap_context(rbd_dev->header.snapc);
1728                 up_read(&rbd_dev->header_rwsem);
1729         }
1730
1731         img_request->rq = NULL;
1732         img_request->rbd_dev = rbd_dev;
1733         img_request->offset = offset;
1734         img_request->length = length;
1735         img_request->flags = 0;
1736         if (write_request) {
1737                 img_request_write_set(img_request);
1738                 img_request->snapc = rbd_dev->header.snapc;
1739         } else {
1740                 img_request->snap_id = rbd_dev->spec->snap_id;
1741         }
1742         if (child_request)
1743                 img_request_child_set(img_request);
1744         if (rbd_dev->parent_spec)
1745                 img_request_layered_set(img_request);
1746         spin_lock_init(&img_request->completion_lock);
1747         img_request->next_completion = 0;
1748         img_request->callback = NULL;
1749         img_request->result = 0;
1750         img_request->obj_request_count = 0;
1751         INIT_LIST_HEAD(&img_request->obj_requests);
1752         kref_init(&img_request->kref);
1753
1754         rbd_img_request_get(img_request);       /* Avoid a warning */
1755         rbd_img_request_put(img_request);       /* TEMPORARY */
1756
1757         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1758                 write_request ? "write" : "read", offset, length,
1759                 img_request);
1760
1761         return img_request;
1762 }
1763
1764 static void rbd_img_request_destroy(struct kref *kref)
1765 {
1766         struct rbd_img_request *img_request;
1767         struct rbd_obj_request *obj_request;
1768         struct rbd_obj_request *next_obj_request;
1769
1770         img_request = container_of(kref, struct rbd_img_request, kref);
1771
1772         dout("%s: img %p\n", __func__, img_request);
1773
1774         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1775                 rbd_img_obj_request_del(img_request, obj_request);
1776         rbd_assert(img_request->obj_request_count == 0);
1777
1778         if (img_request_write_test(img_request))
1779                 ceph_put_snap_context(img_request->snapc);
1780
1781         if (img_request_child_test(img_request))
1782                 rbd_obj_request_put(img_request->obj_request);
1783
1784         kfree(img_request);
1785 }
1786
1787 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1788 {
1789         struct rbd_img_request *img_request;
1790         unsigned int xferred;
1791         int result;
1792         bool more;
1793
1794         rbd_assert(obj_request_img_data_test(obj_request));
1795         img_request = obj_request->img_request;
1796
1797         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1798         xferred = (unsigned int)obj_request->xferred;
1799         result = obj_request->result;
1800         if (result) {
1801                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1802
1803                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1804                         img_request_write_test(img_request) ? "write" : "read",
1805                         obj_request->length, obj_request->img_offset,
1806                         obj_request->offset);
1807                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1808                         result, xferred);
1809                 if (!img_request->result)
1810                         img_request->result = result;
1811         }
1812
1813         /* Image object requests don't own their page array */
1814
1815         if (obj_request->type == OBJ_REQUEST_PAGES) {
1816                 obj_request->pages = NULL;
1817                 obj_request->page_count = 0;
1818         }
1819
1820         if (img_request_child_test(img_request)) {
1821                 rbd_assert(img_request->obj_request != NULL);
1822                 more = obj_request->which < img_request->obj_request_count - 1;
1823         } else {
1824                 rbd_assert(img_request->rq != NULL);
1825                 more = blk_end_request(img_request->rq, result, xferred);
1826         }
1827
1828         return more;
1829 }
1830
1831 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1832 {
1833         struct rbd_img_request *img_request;
1834         u32 which = obj_request->which;
1835         bool more = true;
1836
1837         rbd_assert(obj_request_img_data_test(obj_request));
1838         img_request = obj_request->img_request;
1839
1840         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1841         rbd_assert(img_request != NULL);
1842         rbd_assert(img_request->obj_request_count > 0);
1843         rbd_assert(which != BAD_WHICH);
1844         rbd_assert(which < img_request->obj_request_count);
1845         rbd_assert(which >= img_request->next_completion);
1846
1847         spin_lock_irq(&img_request->completion_lock);
1848         if (which != img_request->next_completion)
1849                 goto out;
1850
1851         for_each_obj_request_from(img_request, obj_request) {
1852                 rbd_assert(more);
1853                 rbd_assert(which < img_request->obj_request_count);
1854
1855                 if (!obj_request_done_test(obj_request))
1856                         break;
1857                 more = rbd_img_obj_end_request(obj_request);
1858                 which++;
1859         }
1860
1861         rbd_assert(more ^ (which == img_request->obj_request_count));
1862         img_request->next_completion = which;
1863 out:
1864         spin_unlock_irq(&img_request->completion_lock);
1865
1866         if (!more)
1867                 rbd_img_request_complete(img_request);
1868 }
1869
1870 /*
1871  * Split up an image request into one or more object requests, each
1872  * to a different object.  The "type" parameter indicates whether
1873  * "data_desc" is the pointer to the head of a list of bio
1874  * structures, or the base of a page array.  In either case this
1875  * function assumes data_desc describes memory sufficient to hold
1876  * all data described by the image request.
1877  */
1878 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1879                                         enum obj_request_type type,
1880                                         void *data_desc)
1881 {
1882         struct rbd_device *rbd_dev = img_request->rbd_dev;
1883         struct rbd_obj_request *obj_request = NULL;
1884         struct rbd_obj_request *next_obj_request;
1885         bool write_request = img_request_write_test(img_request);
1886         struct bio *bio_list;
1887         unsigned int bio_offset = 0;
1888         struct page **pages;
1889         u64 img_offset;
1890         u64 resid;
1891         u16 opcode;
1892
1893         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1894                 (int)type, data_desc);
1895
1896         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1897         img_offset = img_request->offset;
1898         resid = img_request->length;
1899         rbd_assert(resid > 0);
1900
1901         if (type == OBJ_REQUEST_BIO) {
1902                 bio_list = data_desc;
1903                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1904         } else {
1905                 rbd_assert(type == OBJ_REQUEST_PAGES);
1906                 pages = data_desc;
1907         }
1908
1909         while (resid) {
1910                 struct ceph_osd_request *osd_req;
1911                 const char *object_name;
1912                 u64 offset;
1913                 u64 length;
1914
1915                 object_name = rbd_segment_name(rbd_dev, img_offset);
1916                 if (!object_name)
1917                         goto out_unwind;
1918                 offset = rbd_segment_offset(rbd_dev, img_offset);
1919                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1920                 obj_request = rbd_obj_request_create(object_name,
1921                                                 offset, length, type);
1922                 kfree(object_name);     /* object request has its own copy */
1923                 if (!obj_request)
1924                         goto out_unwind;
1925
1926                 if (type == OBJ_REQUEST_BIO) {
1927                         unsigned int clone_size;
1928
1929                         rbd_assert(length <= (u64)UINT_MAX);
1930                         clone_size = (unsigned int)length;
1931                         obj_request->bio_list =
1932                                         bio_chain_clone_range(&bio_list,
1933                                                                 &bio_offset,
1934                                                                 clone_size,
1935                                                                 GFP_ATOMIC);
1936                         if (!obj_request->bio_list)
1937                                 goto out_partial;
1938                 } else {
1939                         unsigned int page_count;
1940
1941                         obj_request->pages = pages;
1942                         page_count = (u32)calc_pages_for(offset, length);
1943                         obj_request->page_count = page_count;
1944                         if ((offset + length) & ~PAGE_MASK)
1945                                 page_count--;   /* more on last page */
1946                         pages += page_count;
1947                 }
1948
1949                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1950                                                 obj_request);
1951                 if (!osd_req)
1952                         goto out_partial;
1953                 obj_request->osd_req = osd_req;
1954                 obj_request->callback = rbd_img_obj_callback;
1955
1956                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1957                                                 0, 0);
1958                 if (type == OBJ_REQUEST_BIO)
1959                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1960                                         obj_request->bio_list, length);
1961                 else
1962                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1963                                         obj_request->pages, length,
1964                                         offset & ~PAGE_MASK, false, false);
1965
1966                 if (write_request)
1967                         rbd_osd_req_format_write(obj_request);
1968                 else
1969                         rbd_osd_req_format_read(obj_request);
1970
1971                 obj_request->img_offset = img_offset;
1972                 rbd_img_obj_request_add(img_request, obj_request);
1973
1974                 img_offset += length;
1975                 resid -= length;
1976         }
1977
1978         return 0;
1979
1980 out_partial:
1981         rbd_obj_request_put(obj_request);
1982 out_unwind:
1983         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1984                 rbd_obj_request_put(obj_request);
1985
1986         return -ENOMEM;
1987 }
1988
1989 static void
1990 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1991 {
1992         struct rbd_img_request *img_request;
1993         struct rbd_device *rbd_dev;
1994         u64 length;
1995         u32 page_count;
1996
1997         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1998         rbd_assert(obj_request_img_data_test(obj_request));
1999         img_request = obj_request->img_request;
2000         rbd_assert(img_request);
2001
2002         rbd_dev = img_request->rbd_dev;
2003         rbd_assert(rbd_dev);
2004         length = (u64)1 << rbd_dev->header.obj_order;
2005         page_count = (u32)calc_pages_for(0, length);
2006
2007         rbd_assert(obj_request->copyup_pages);
2008         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2009         obj_request->copyup_pages = NULL;
2010
2011         /*
2012          * We want the transfer count to reflect the size of the
2013          * original write request.  There is no such thing as a
2014          * successful short write, so if the request was successful
2015          * we can just set it to the originally-requested length.
2016          */
2017         if (!obj_request->result)
2018                 obj_request->xferred = obj_request->length;
2019
2020         /* Finish up with the normal image object callback */
2021
2022         rbd_img_obj_callback(obj_request);
2023 }
2024
2025 static void
2026 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2027 {
2028         struct rbd_obj_request *orig_request;
2029         struct ceph_osd_request *osd_req;
2030         struct ceph_osd_client *osdc;
2031         struct rbd_device *rbd_dev;
2032         struct page **pages;
2033         int result;
2034         u64 obj_size;
2035         u64 xferred;
2036
2037         rbd_assert(img_request_child_test(img_request));
2038
2039         /* First get what we need from the image request */
2040
2041         pages = img_request->copyup_pages;
2042         rbd_assert(pages != NULL);
2043         img_request->copyup_pages = NULL;
2044
2045         orig_request = img_request->obj_request;
2046         rbd_assert(orig_request != NULL);
2047         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2048         result = img_request->result;
2049         obj_size = img_request->length;
2050         xferred = img_request->xferred;
2051
2052         rbd_dev = img_request->rbd_dev;
2053         rbd_assert(rbd_dev);
2054         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2055
2056         rbd_img_request_put(img_request);
2057
2058         if (result)
2059                 goto out_err;
2060
2061         /* Allocate the new copyup osd request for the original request */
2062
2063         result = -ENOMEM;
2064         rbd_assert(!orig_request->osd_req);
2065         osd_req = rbd_osd_req_create_copyup(orig_request);
2066         if (!osd_req)
2067                 goto out_err;
2068         orig_request->osd_req = osd_req;
2069         orig_request->copyup_pages = pages;
2070
2071         /* Initialize the copyup op */
2072
2073         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2074         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2075                                                 false, false);
2076
2077         /* Then the original write request op */
2078
2079         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2080                                         orig_request->offset,
2081                                         orig_request->length, 0, 0);
2082         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2083                                         orig_request->length);
2084
2085         rbd_osd_req_format_write(orig_request);
2086
2087         /* All set, send it off. */
2088
2089         orig_request->callback = rbd_img_obj_copyup_callback;
2090         osdc = &rbd_dev->rbd_client->client->osdc;
2091         result = rbd_obj_request_submit(osdc, orig_request);
2092         if (!result)
2093                 return;
2094 out_err:
2095         /* Record the error code and complete the request */
2096
2097         orig_request->result = result;
2098         orig_request->xferred = 0;
2099         obj_request_done_set(orig_request);
2100         rbd_obj_request_complete(orig_request);
2101 }
2102
2103 /*
2104  * Read from the parent image the range of data that covers the
2105  * entire target of the given object request.  This is used for
2106  * satisfying a layered image write request when the target of an
2107  * object request from the image request does not exist.
2108  *
2109  * A page array big enough to hold the returned data is allocated
2110  * and supplied to rbd_img_request_fill() as the "data descriptor."
2111  * When the read completes, this page array will be transferred to
2112  * the original object request for the copyup operation.
2113  *
2114  * If an error occurs, record it as the result of the original
2115  * object request and mark it done so it gets completed.
2116  */
2117 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2118 {
2119         struct rbd_img_request *img_request = NULL;
2120         struct rbd_img_request *parent_request = NULL;
2121         struct rbd_device *rbd_dev;
2122         u64 img_offset;
2123         u64 length;
2124         struct page **pages = NULL;
2125         u32 page_count;
2126         int result;
2127
2128         rbd_assert(obj_request_img_data_test(obj_request));
2129         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2130
2131         img_request = obj_request->img_request;
2132         rbd_assert(img_request != NULL);
2133         rbd_dev = img_request->rbd_dev;
2134         rbd_assert(rbd_dev->parent != NULL);
2135
2136         /*
2137          * First things first.  The original osd request is of no
2138          * use to use any more, we'll need a new one that can hold
2139          * the two ops in a copyup request.  We'll get that later,
2140          * but for now we can release the old one.
2141          */
2142         rbd_osd_req_destroy(obj_request->osd_req);
2143         obj_request->osd_req = NULL;
2144
2145         /*
2146          * Determine the byte range covered by the object in the
2147          * child image to which the original request was to be sent.
2148          */
2149         img_offset = obj_request->img_offset - obj_request->offset;
2150         length = (u64)1 << rbd_dev->header.obj_order;
2151
2152         /*
2153          * There is no defined parent data beyond the parent
2154          * overlap, so limit what we read at that boundary if
2155          * necessary.
2156          */
2157         if (img_offset + length > rbd_dev->parent_overlap) {
2158                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2159                 length = rbd_dev->parent_overlap - img_offset;
2160         }
2161
2162         /*
2163          * Allocate a page array big enough to receive the data read
2164          * from the parent.
2165          */
2166         page_count = (u32)calc_pages_for(0, length);
2167         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2168         if (IS_ERR(pages)) {
2169                 result = PTR_ERR(pages);
2170                 pages = NULL;
2171                 goto out_err;
2172         }
2173
2174         result = -ENOMEM;
2175         parent_request = rbd_img_request_create(rbd_dev->parent,
2176                                                 img_offset, length,
2177                                                 false, true);
2178         if (!parent_request)
2179                 goto out_err;
2180         rbd_obj_request_get(obj_request);
2181         parent_request->obj_request = obj_request;
2182
2183         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2184         if (result)
2185                 goto out_err;
2186         parent_request->copyup_pages = pages;
2187
2188         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2189         result = rbd_img_request_submit(parent_request);
2190         if (!result)
2191                 return 0;
2192
2193         parent_request->copyup_pages = NULL;
2194         parent_request->obj_request = NULL;
2195         rbd_obj_request_put(obj_request);
2196 out_err:
2197         if (pages)
2198                 ceph_release_page_vector(pages, page_count);
2199         if (parent_request)
2200                 rbd_img_request_put(parent_request);
2201         obj_request->result = result;
2202         obj_request->xferred = 0;
2203         obj_request_done_set(obj_request);
2204
2205         return result;
2206 }
2207
2208 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2209 {
2210         struct rbd_obj_request *orig_request;
2211         int result;
2212
2213         rbd_assert(!obj_request_img_data_test(obj_request));
2214
2215         /*
2216          * All we need from the object request is the original
2217          * request and the result of the STAT op.  Grab those, then
2218          * we're done with the request.
2219          */
2220         orig_request = obj_request->obj_request;
2221         obj_request->obj_request = NULL;
2222         rbd_assert(orig_request);
2223         rbd_assert(orig_request->img_request);
2224
2225         result = obj_request->result;
2226         obj_request->result = 0;
2227
2228         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2229                 obj_request, orig_request, result,
2230                 obj_request->xferred, obj_request->length);
2231         rbd_obj_request_put(obj_request);
2232
2233         rbd_assert(orig_request);
2234         rbd_assert(orig_request->img_request);
2235
2236         /*
2237          * Our only purpose here is to determine whether the object
2238          * exists, and we don't want to treat the non-existence as
2239          * an error.  If something else comes back, transfer the
2240          * error to the original request and complete it now.
2241          */
2242         if (!result) {
2243                 obj_request_existence_set(orig_request, true);
2244         } else if (result == -ENOENT) {
2245                 obj_request_existence_set(orig_request, false);
2246         } else if (result) {
2247                 orig_request->result = result;
2248                 goto out;
2249         }
2250
2251         /*
2252          * Resubmit the original request now that we have recorded
2253          * whether the target object exists.
2254          */
2255         orig_request->result = rbd_img_obj_request_submit(orig_request);
2256 out:
2257         if (orig_request->result)
2258                 rbd_obj_request_complete(orig_request);
2259         rbd_obj_request_put(orig_request);
2260 }
2261
2262 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2263 {
2264         struct rbd_obj_request *stat_request;
2265         struct rbd_device *rbd_dev;
2266         struct ceph_osd_client *osdc;
2267         struct page **pages = NULL;
2268         u32 page_count;
2269         size_t size;
2270         int ret;
2271
2272         /*
2273          * The response data for a STAT call consists of:
2274          *     le64 length;
2275          *     struct {
2276          *         le32 tv_sec;
2277          *         le32 tv_nsec;
2278          *     } mtime;
2279          */
2280         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2281         page_count = (u32)calc_pages_for(0, size);
2282         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2283         if (IS_ERR(pages))
2284                 return PTR_ERR(pages);
2285
2286         ret = -ENOMEM;
2287         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2288                                                         OBJ_REQUEST_PAGES);
2289         if (!stat_request)
2290                 goto out;
2291
2292         rbd_obj_request_get(obj_request);
2293         stat_request->obj_request = obj_request;
2294         stat_request->pages = pages;
2295         stat_request->page_count = page_count;
2296
2297         rbd_assert(obj_request->img_request);
2298         rbd_dev = obj_request->img_request->rbd_dev;
2299         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2300                                                 stat_request);
2301         if (!stat_request->osd_req)
2302                 goto out;
2303         stat_request->callback = rbd_img_obj_exists_callback;
2304
2305         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2306         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2307                                         false, false);
2308         rbd_osd_req_format_read(stat_request);
2309
2310         osdc = &rbd_dev->rbd_client->client->osdc;
2311         ret = rbd_obj_request_submit(osdc, stat_request);
2312 out:
2313         if (ret)
2314                 rbd_obj_request_put(obj_request);
2315
2316         return ret;
2317 }
2318
2319 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2320 {
2321         struct rbd_img_request *img_request;
2322         struct rbd_device *rbd_dev;
2323         bool known;
2324
2325         rbd_assert(obj_request_img_data_test(obj_request));
2326
2327         img_request = obj_request->img_request;
2328         rbd_assert(img_request);
2329         rbd_dev = img_request->rbd_dev;
2330
2331         /*
2332          * Only writes to layered images need special handling.
2333          * Reads and non-layered writes are simple object requests.
2334          * Layered writes that start beyond the end of the overlap
2335          * with the parent have no parent data, so they too are
2336          * simple object requests.  Finally, if the target object is
2337          * known to already exist, its parent data has already been
2338          * copied, so a write to the object can also be handled as a
2339          * simple object request.
2340          */
2341         if (!img_request_write_test(img_request) ||
2342                 !img_request_layered_test(img_request) ||
2343                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2344                 ((known = obj_request_known_test(obj_request)) &&
2345                         obj_request_exists_test(obj_request))) {
2346
2347                 struct rbd_device *rbd_dev;
2348                 struct ceph_osd_client *osdc;
2349
2350                 rbd_dev = obj_request->img_request->rbd_dev;
2351                 osdc = &rbd_dev->rbd_client->client->osdc;
2352
2353                 return rbd_obj_request_submit(osdc, obj_request);
2354         }
2355
2356         /*
2357          * It's a layered write.  The target object might exist but
2358          * we may not know that yet.  If we know it doesn't exist,
2359          * start by reading the data for the full target object from
2360          * the parent so we can use it for a copyup to the target.
2361          */
2362         if (known)
2363                 return rbd_img_obj_parent_read_full(obj_request);
2364
2365         /* We don't know whether the target exists.  Go find out. */
2366
2367         return rbd_img_obj_exists_submit(obj_request);
2368 }
2369
2370 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2371 {
2372         struct rbd_obj_request *obj_request;
2373         struct rbd_obj_request *next_obj_request;
2374
2375         dout("%s: img %p\n", __func__, img_request);
2376         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2377                 int ret;
2378
2379                 ret = rbd_img_obj_request_submit(obj_request);
2380                 if (ret)
2381                         return ret;
2382         }
2383
2384         return 0;
2385 }
2386
2387 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2388 {
2389         struct rbd_obj_request *obj_request;
2390         struct rbd_device *rbd_dev;
2391         u64 obj_end;
2392
2393         rbd_assert(img_request_child_test(img_request));
2394
2395         obj_request = img_request->obj_request;
2396         rbd_assert(obj_request);
2397         rbd_assert(obj_request->img_request);
2398
2399         obj_request->result = img_request->result;
2400         if (obj_request->result)
2401                 goto out;
2402
2403         /*
2404          * We need to zero anything beyond the parent overlap
2405          * boundary.  Since rbd_img_obj_request_read_callback()
2406          * will zero anything beyond the end of a short read, an
2407          * easy way to do this is to pretend the data from the
2408          * parent came up short--ending at the overlap boundary.
2409          */
2410         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2411         obj_end = obj_request->img_offset + obj_request->length;
2412         rbd_dev = obj_request->img_request->rbd_dev;
2413         if (obj_end > rbd_dev->parent_overlap) {
2414                 u64 xferred = 0;
2415
2416                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2417                         xferred = rbd_dev->parent_overlap -
2418                                         obj_request->img_offset;
2419
2420                 obj_request->xferred = min(img_request->xferred, xferred);
2421         } else {
2422                 obj_request->xferred = img_request->xferred;
2423         }
2424 out:
2425         rbd_img_obj_request_read_callback(obj_request);
2426         rbd_obj_request_complete(obj_request);
2427 }
2428
2429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2430 {
2431         struct rbd_device *rbd_dev;
2432         struct rbd_img_request *img_request;
2433         int result;
2434
2435         rbd_assert(obj_request_img_data_test(obj_request));
2436         rbd_assert(obj_request->img_request != NULL);
2437         rbd_assert(obj_request->result == (s32) -ENOENT);
2438         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2439
2440         rbd_dev = obj_request->img_request->rbd_dev;
2441         rbd_assert(rbd_dev->parent != NULL);
2442         /* rbd_read_finish(obj_request, obj_request->length); */
2443         img_request = rbd_img_request_create(rbd_dev->parent,
2444                                                 obj_request->img_offset,
2445                                                 obj_request->length,
2446                                                 false, true);
2447         result = -ENOMEM;
2448         if (!img_request)
2449                 goto out_err;
2450
2451         rbd_obj_request_get(obj_request);
2452         img_request->obj_request = obj_request;
2453
2454         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2455                                         obj_request->bio_list);
2456         if (result)
2457                 goto out_err;
2458
2459         img_request->callback = rbd_img_parent_read_callback;
2460         result = rbd_img_request_submit(img_request);
2461         if (result)
2462                 goto out_err;
2463
2464         return;
2465 out_err:
2466         if (img_request)
2467                 rbd_img_request_put(img_request);
2468         obj_request->result = result;
2469         obj_request->xferred = 0;
2470         obj_request_done_set(obj_request);
2471 }
2472
2473 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2474                                    u64 ver, u64 notify_id)
2475 {
2476         struct rbd_obj_request *obj_request;
2477         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2478         int ret;
2479
2480         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2481                                                         OBJ_REQUEST_NODATA);
2482         if (!obj_request)
2483                 return -ENOMEM;
2484
2485         ret = -ENOMEM;
2486         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2487         if (!obj_request->osd_req)
2488                 goto out;
2489         obj_request->callback = rbd_obj_request_put;
2490
2491         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2492                                         notify_id, ver, 0);
2493         rbd_osd_req_format_read(obj_request);
2494
2495         ret = rbd_obj_request_submit(osdc, obj_request);
2496 out:
2497         if (ret)
2498                 rbd_obj_request_put(obj_request);
2499
2500         return ret;
2501 }
2502
2503 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2504 {
2505         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2506         u64 hver;
2507
2508         if (!rbd_dev)
2509                 return;
2510
2511         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2512                 rbd_dev->header_name, (unsigned long long) notify_id,
2513                 (unsigned int) opcode);
2514         (void)rbd_dev_refresh(rbd_dev, &hver);
2515
2516         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2517 }
2518
2519 /*
2520  * Request sync osd watch/unwatch.  The value of "start" determines
2521  * whether a watch request is being initiated or torn down.
2522  */
2523 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2524 {
2525         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2526         struct rbd_obj_request *obj_request;
2527         int ret;
2528
2529         rbd_assert(start ^ !!rbd_dev->watch_event);
2530         rbd_assert(start ^ !!rbd_dev->watch_request);
2531
2532         if (start) {
2533                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2534                                                 &rbd_dev->watch_event);
2535                 if (ret < 0)
2536                         return ret;
2537                 rbd_assert(rbd_dev->watch_event != NULL);
2538         }
2539
2540         ret = -ENOMEM;
2541         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2542                                                         OBJ_REQUEST_NODATA);
2543         if (!obj_request)
2544                 goto out_cancel;
2545
2546         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2547         if (!obj_request->osd_req)
2548                 goto out_cancel;
2549
2550         if (start)
2551                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2552         else
2553                 ceph_osdc_unregister_linger_request(osdc,
2554                                         rbd_dev->watch_request->osd_req);
2555
2556         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2557                                 rbd_dev->watch_event->cookie,
2558                                 rbd_dev->header.obj_version, start);
2559         rbd_osd_req_format_write(obj_request);
2560
2561         ret = rbd_obj_request_submit(osdc, obj_request);
2562         if (ret)
2563                 goto out_cancel;
2564         ret = rbd_obj_request_wait(obj_request);
2565         if (ret)
2566                 goto out_cancel;
2567         ret = obj_request->result;
2568         if (ret)
2569                 goto out_cancel;
2570
2571         /*
2572          * A watch request is set to linger, so the underlying osd
2573          * request won't go away until we unregister it.  We retain
2574          * a pointer to the object request during that time (in
2575          * rbd_dev->watch_request), so we'll keep a reference to
2576          * it.  We'll drop that reference (below) after we've
2577          * unregistered it.
2578          */
2579         if (start) {
2580                 rbd_dev->watch_request = obj_request;
2581
2582                 return 0;
2583         }
2584
2585         /* We have successfully torn down the watch request */
2586
2587         rbd_obj_request_put(rbd_dev->watch_request);
2588         rbd_dev->watch_request = NULL;
2589 out_cancel:
2590         /* Cancel the event if we're tearing down, or on error */
2591         ceph_osdc_cancel_event(rbd_dev->watch_event);
2592         rbd_dev->watch_event = NULL;
2593         if (obj_request)
2594                 rbd_obj_request_put(obj_request);
2595
2596         return ret;
2597 }
2598
2599 /*
2600  * Synchronous osd object method call.  Returns the number of bytes
2601  * returned in the outbound buffer, or a negative error code.
2602  */
2603 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2604                              const char *object_name,
2605                              const char *class_name,
2606                              const char *method_name,
2607                              const void *outbound,
2608                              size_t outbound_size,
2609                              void *inbound,
2610                              size_t inbound_size,
2611                              u64 *version)
2612 {
2613         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2614         struct rbd_obj_request *obj_request;
2615         struct page **pages;
2616         u32 page_count;
2617         int ret;
2618
2619         /*
2620          * Method calls are ultimately read operations.  The result
2621          * should placed into the inbound buffer provided.  They
2622          * also supply outbound data--parameters for the object
2623          * method.  Currently if this is present it will be a
2624          * snapshot id.
2625          */
2626         page_count = (u32)calc_pages_for(0, inbound_size);
2627         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2628         if (IS_ERR(pages))
2629                 return PTR_ERR(pages);
2630
2631         ret = -ENOMEM;
2632         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2633                                                         OBJ_REQUEST_PAGES);
2634         if (!obj_request)
2635                 goto out;
2636
2637         obj_request->pages = pages;
2638         obj_request->page_count = page_count;
2639
2640         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2641         if (!obj_request->osd_req)
2642                 goto out;
2643
2644         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2645                                         class_name, method_name);
2646         if (outbound_size) {
2647                 struct ceph_pagelist *pagelist;
2648
2649                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2650                 if (!pagelist)
2651                         goto out;
2652
2653                 ceph_pagelist_init(pagelist);
2654                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2655                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2656                                                 pagelist);
2657         }
2658         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2659                                         obj_request->pages, inbound_size,
2660                                         0, false, false);
2661         rbd_osd_req_format_read(obj_request);
2662
2663         ret = rbd_obj_request_submit(osdc, obj_request);
2664         if (ret)
2665                 goto out;
2666         ret = rbd_obj_request_wait(obj_request);
2667         if (ret)
2668                 goto out;
2669
2670         ret = obj_request->result;
2671         if (ret < 0)
2672                 goto out;
2673
2674         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2675         ret = (int)obj_request->xferred;
2676         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2677         if (version)
2678                 *version = obj_request->version;
2679 out:
2680         if (obj_request)
2681                 rbd_obj_request_put(obj_request);
2682         else
2683                 ceph_release_page_vector(pages, page_count);
2684
2685         return ret;
2686 }
2687
2688 static void rbd_request_fn(struct request_queue *q)
2689                 __releases(q->queue_lock) __acquires(q->queue_lock)
2690 {
2691         struct rbd_device *rbd_dev = q->queuedata;
2692         bool read_only = rbd_dev->mapping.read_only;
2693         struct request *rq;
2694         int result;
2695
2696         while ((rq = blk_fetch_request(q))) {
2697                 bool write_request = rq_data_dir(rq) == WRITE;
2698                 struct rbd_img_request *img_request;
2699                 u64 offset;
2700                 u64 length;
2701
2702                 /* Ignore any non-FS requests that filter through. */
2703
2704                 if (rq->cmd_type != REQ_TYPE_FS) {
2705                         dout("%s: non-fs request type %d\n", __func__,
2706                                 (int) rq->cmd_type);
2707                         __blk_end_request_all(rq, 0);
2708                         continue;
2709                 }
2710
2711                 /* Ignore/skip any zero-length requests */
2712
2713                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2714                 length = (u64) blk_rq_bytes(rq);
2715
2716                 if (!length) {
2717                         dout("%s: zero-length request\n", __func__);
2718                         __blk_end_request_all(rq, 0);
2719                         continue;
2720                 }
2721
2722                 spin_unlock_irq(q->queue_lock);
2723
2724                 /* Disallow writes to a read-only device */
2725
2726                 if (write_request) {
2727                         result = -EROFS;
2728                         if (read_only)
2729                                 goto end_request;
2730                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2731                 }
2732
2733                 /*
2734                  * Quit early if the mapped snapshot no longer
2735                  * exists.  It's still possible the snapshot will
2736                  * have disappeared by the time our request arrives
2737                  * at the osd, but there's no sense in sending it if
2738                  * we already know.
2739                  */
2740                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2741                         dout("request for non-existent snapshot");
2742                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2743                         result = -ENXIO;
2744                         goto end_request;
2745                 }
2746
2747                 result = -EINVAL;
2748                 if (offset && length > U64_MAX - offset + 1) {
2749                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2750                                 offset, length);
2751                         goto end_request;       /* Shouldn't happen */
2752                 }
2753
2754                 result = -ENOMEM;
2755                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2756                                                         write_request, false);
2757                 if (!img_request)
2758                         goto end_request;
2759
2760                 img_request->rq = rq;
2761
2762                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2763                                                 rq->bio);
2764                 if (!result)
2765                         result = rbd_img_request_submit(img_request);
2766                 if (result)
2767                         rbd_img_request_put(img_request);
2768 end_request:
2769                 spin_lock_irq(q->queue_lock);
2770                 if (result < 0) {
2771                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2772                                 write_request ? "write" : "read",
2773                                 length, offset, result);
2774
2775                         __blk_end_request_all(rq, result);
2776                 }
2777         }
2778 }
2779
2780 /*
2781  * a queue callback. Makes sure that we don't create a bio that spans across
2782  * multiple osd objects. One exception would be with a single page bios,
2783  * which we handle later at bio_chain_clone_range()
2784  */
2785 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2786                           struct bio_vec *bvec)
2787 {
2788         struct rbd_device *rbd_dev = q->queuedata;
2789         sector_t sector_offset;
2790         sector_t sectors_per_obj;
2791         sector_t obj_sector_offset;
2792         int ret;
2793
2794         /*
2795          * Find how far into its rbd object the partition-relative
2796          * bio start sector is to offset relative to the enclosing
2797          * device.
2798          */
2799         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2800         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2801         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2802
2803         /*
2804          * Compute the number of bytes from that offset to the end
2805          * of the object.  Account for what's already used by the bio.
2806          */
2807         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2808         if (ret > bmd->bi_size)
2809                 ret -= bmd->bi_size;
2810         else
2811                 ret = 0;
2812
2813         /*
2814          * Don't send back more than was asked for.  And if the bio
2815          * was empty, let the whole thing through because:  "Note
2816          * that a block device *must* allow a single page to be
2817          * added to an empty bio."
2818          */
2819         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2820         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2821                 ret = (int) bvec->bv_len;
2822
2823         return ret;
2824 }
2825
2826 static void rbd_free_disk(struct rbd_device *rbd_dev)
2827 {
2828         struct gendisk *disk = rbd_dev->disk;
2829
2830         if (!disk)
2831                 return;
2832
2833         rbd_dev->disk = NULL;
2834         if (disk->flags & GENHD_FL_UP) {
2835                 del_gendisk(disk);
2836                 if (disk->queue)
2837                         blk_cleanup_queue(disk->queue);
2838         }
2839         put_disk(disk);
2840 }
2841
2842 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2843                                 const char *object_name,
2844                                 u64 offset, u64 length,
2845                                 void *buf, u64 *version)
2846
2847 {
2848         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2849         struct rbd_obj_request *obj_request;
2850         struct page **pages = NULL;
2851         u32 page_count;
2852         size_t size;
2853         int ret;
2854
2855         page_count = (u32) calc_pages_for(offset, length);
2856         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2857         if (IS_ERR(pages))
2858                 ret = PTR_ERR(pages);
2859
2860         ret = -ENOMEM;
2861         obj_request = rbd_obj_request_create(object_name, offset, length,
2862                                                         OBJ_REQUEST_PAGES);
2863         if (!obj_request)
2864                 goto out;
2865
2866         obj_request->pages = pages;
2867         obj_request->page_count = page_count;
2868
2869         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2870         if (!obj_request->osd_req)
2871                 goto out;
2872
2873         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2874                                         offset, length, 0, 0);
2875         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2876                                         obj_request->pages,
2877                                         obj_request->length,
2878                                         obj_request->offset & ~PAGE_MASK,
2879                                         false, false);
2880         rbd_osd_req_format_read(obj_request);
2881
2882         ret = rbd_obj_request_submit(osdc, obj_request);
2883         if (ret)
2884                 goto out;
2885         ret = rbd_obj_request_wait(obj_request);
2886         if (ret)
2887                 goto out;
2888
2889         ret = obj_request->result;
2890         if (ret < 0)
2891                 goto out;
2892
2893         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2894         size = (size_t) obj_request->xferred;
2895         ceph_copy_from_page_vector(pages, buf, 0, size);
2896         rbd_assert(size <= (size_t) INT_MAX);
2897         ret = (int) size;
2898         if (version)
2899                 *version = obj_request->version;
2900 out:
2901         if (obj_request)
2902                 rbd_obj_request_put(obj_request);
2903         else
2904                 ceph_release_page_vector(pages, page_count);
2905
2906         return ret;
2907 }
2908
2909 /*
2910  * Read the complete header for the given rbd device.
2911  *
2912  * Returns a pointer to a dynamically-allocated buffer containing
2913  * the complete and validated header.  Caller can pass the address
2914  * of a variable that will be filled in with the version of the
2915  * header object at the time it was read.
2916  *
2917  * Returns a pointer-coded errno if a failure occurs.
2918  */
2919 static struct rbd_image_header_ondisk *
2920 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2921 {
2922         struct rbd_image_header_ondisk *ondisk = NULL;
2923         u32 snap_count = 0;
2924         u64 names_size = 0;
2925         u32 want_count;
2926         int ret;
2927
2928         /*
2929          * The complete header will include an array of its 64-bit
2930          * snapshot ids, followed by the names of those snapshots as
2931          * a contiguous block of NUL-terminated strings.  Note that
2932          * the number of snapshots could change by the time we read
2933          * it in, in which case we re-read it.
2934          */
2935         do {
2936                 size_t size;
2937
2938                 kfree(ondisk);
2939
2940                 size = sizeof (*ondisk);
2941                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2942                 size += names_size;
2943                 ondisk = kmalloc(size, GFP_KERNEL);
2944                 if (!ondisk)
2945                         return ERR_PTR(-ENOMEM);
2946
2947                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2948                                        0, size, ondisk, version);
2949                 if (ret < 0)
2950                         goto out_err;
2951                 if ((size_t)ret < size) {
2952                         ret = -ENXIO;
2953                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2954                                 size, ret);
2955                         goto out_err;
2956                 }
2957                 if (!rbd_dev_ondisk_valid(ondisk)) {
2958                         ret = -ENXIO;
2959                         rbd_warn(rbd_dev, "invalid header");
2960                         goto out_err;
2961                 }
2962
2963                 names_size = le64_to_cpu(ondisk->snap_names_len);
2964                 want_count = snap_count;
2965                 snap_count = le32_to_cpu(ondisk->snap_count);
2966         } while (snap_count != want_count);
2967
2968         return ondisk;
2969
2970 out_err:
2971         kfree(ondisk);
2972
2973         return ERR_PTR(ret);
2974 }
2975
2976 /*
2977  * reload the ondisk the header
2978  */
2979 static int rbd_read_header(struct rbd_device *rbd_dev,
2980                            struct rbd_image_header *header)
2981 {
2982         struct rbd_image_header_ondisk *ondisk;
2983         u64 ver = 0;
2984         int ret;
2985
2986         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2987         if (IS_ERR(ondisk))
2988                 return PTR_ERR(ondisk);
2989         ret = rbd_header_from_disk(header, ondisk);
2990         if (ret >= 0)
2991                 header->obj_version = ver;
2992         kfree(ondisk);
2993
2994         return ret;
2995 }
2996
2997 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2998 {
2999         struct rbd_snap *snap;
3000         struct rbd_snap *next;
3001
3002         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3003                 list_del(&snap->node);
3004                 rbd_snap_destroy(snap);
3005         }
3006 }
3007
3008 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3009 {
3010         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3011                 return;
3012
3013         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3014                 sector_t size;
3015
3016                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3017                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3018                 dout("setting size to %llu sectors", (unsigned long long)size);
3019                 set_capacity(rbd_dev->disk, size);
3020         }
3021 }
3022
3023 /*
3024  * only read the first part of the ondisk header, without the snaps info
3025  */
3026 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3027 {
3028         int ret;
3029         struct rbd_image_header h;
3030
3031         ret = rbd_read_header(rbd_dev, &h);
3032         if (ret < 0)
3033                 return ret;
3034
3035         down_write(&rbd_dev->header_rwsem);
3036
3037         /* Update image size, and check for resize of mapped image */
3038         rbd_dev->header.image_size = h.image_size;
3039         rbd_update_mapping_size(rbd_dev);
3040
3041         /* rbd_dev->header.object_prefix shouldn't change */
3042         kfree(rbd_dev->header.snap_sizes);
3043         kfree(rbd_dev->header.snap_names);
3044         /* osd requests may still refer to snapc */
3045         ceph_put_snap_context(rbd_dev->header.snapc);
3046
3047         if (hver)
3048                 *hver = h.obj_version;
3049         rbd_dev->header.obj_version = h.obj_version;
3050         rbd_dev->header.image_size = h.image_size;
3051         rbd_dev->header.snapc = h.snapc;
3052         rbd_dev->header.snap_names = h.snap_names;
3053         rbd_dev->header.snap_sizes = h.snap_sizes;
3054         /* Free the extra copy of the object prefix */
3055         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3056                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3057         kfree(h.object_prefix);
3058
3059         ret = rbd_dev_snaps_update(rbd_dev);
3060
3061         up_write(&rbd_dev->header_rwsem);
3062
3063         return ret;
3064 }
3065
3066 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3067 {
3068         int ret;
3069
3070         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3071         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3072         if (rbd_dev->image_format == 1)
3073                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3074         else
3075                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3076         mutex_unlock(&ctl_mutex);
3077         revalidate_disk(rbd_dev->disk);
3078         if (ret)
3079                 rbd_warn(rbd_dev, "got notification but failed to "
3080                            " update snaps: %d\n", ret);
3081
3082         return ret;
3083 }
3084
3085 static int rbd_init_disk(struct rbd_device *rbd_dev)
3086 {
3087         struct gendisk *disk;
3088         struct request_queue *q;
3089         u64 segment_size;
3090
3091         /* create gendisk info */
3092         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3093         if (!disk)
3094                 return -ENOMEM;
3095
3096         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3097                  rbd_dev->dev_id);
3098         disk->major = rbd_dev->major;
3099         disk->first_minor = 0;
3100         disk->fops = &rbd_bd_ops;
3101         disk->private_data = rbd_dev;
3102
3103         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3104         if (!q)
3105                 goto out_disk;
3106
3107         /* We use the default size, but let's be explicit about it. */
3108         blk_queue_physical_block_size(q, SECTOR_SIZE);
3109
3110         /* set io sizes to object size */
3111         segment_size = rbd_obj_bytes(&rbd_dev->header);
3112         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3113         blk_queue_max_segment_size(q, segment_size);
3114         blk_queue_io_min(q, segment_size);
3115         blk_queue_io_opt(q, segment_size);
3116
3117         blk_queue_merge_bvec(q, rbd_merge_bvec);
3118         disk->queue = q;
3119
3120         q->queuedata = rbd_dev;
3121
3122         rbd_dev->disk = disk;
3123
3124         return 0;
3125 out_disk:
3126         put_disk(disk);
3127
3128         return -ENOMEM;
3129 }
3130
3131 /*
3132   sysfs
3133 */
3134
3135 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3136 {
3137         return container_of(dev, struct rbd_device, dev);
3138 }
3139
3140 static ssize_t rbd_size_show(struct device *dev,
3141                              struct device_attribute *attr, char *buf)
3142 {
3143         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3144
3145         return sprintf(buf, "%llu\n",
3146                 (unsigned long long)rbd_dev->mapping.size);
3147 }
3148
3149 /*
3150  * Note this shows the features for whatever's mapped, which is not
3151  * necessarily the base image.
3152  */
3153 static ssize_t rbd_features_show(struct device *dev,
3154                              struct device_attribute *attr, char *buf)
3155 {
3156         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3157
3158         return sprintf(buf, "0x%016llx\n",
3159                         (unsigned long long)rbd_dev->mapping.features);
3160 }
3161
3162 static ssize_t rbd_major_show(struct device *dev,
3163                               struct device_attribute *attr, char *buf)
3164 {
3165         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3166
3167         if (rbd_dev->major)
3168                 return sprintf(buf, "%d\n", rbd_dev->major);
3169
3170         return sprintf(buf, "(none)\n");
3171
3172 }
3173
3174 static ssize_t rbd_client_id_show(struct device *dev,
3175                                   struct device_attribute *attr, char *buf)
3176 {
3177         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3178
3179         return sprintf(buf, "client%lld\n",
3180                         ceph_client_id(rbd_dev->rbd_client->client));
3181 }
3182
3183 static ssize_t rbd_pool_show(struct device *dev,
3184                              struct device_attribute *attr, char *buf)
3185 {
3186         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3187
3188         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3189 }
3190
3191 static ssize_t rbd_pool_id_show(struct device *dev,
3192                              struct device_attribute *attr, char *buf)
3193 {
3194         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3195
3196         return sprintf(buf, "%llu\n",
3197                         (unsigned long long) rbd_dev->spec->pool_id);
3198 }
3199
3200 static ssize_t rbd_name_show(struct device *dev,
3201                              struct device_attribute *attr, char *buf)
3202 {
3203         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3204
3205         if (rbd_dev->spec->image_name)
3206                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3207
3208         return sprintf(buf, "(unknown)\n");
3209 }
3210
3211 static ssize_t rbd_image_id_show(struct device *dev,
3212                              struct device_attribute *attr, char *buf)
3213 {
3214         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3215
3216         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3217 }
3218
3219 /*
3220  * Shows the name of the currently-mapped snapshot (or
3221  * RBD_SNAP_HEAD_NAME for the base image).
3222  */
3223 static ssize_t rbd_snap_show(struct device *dev,
3224                              struct device_attribute *attr,
3225                              char *buf)
3226 {
3227         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3228
3229         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3230 }
3231
3232 /*
3233  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3234  * for the parent image.  If there is no parent, simply shows
3235  * "(no parent image)".
3236  */
3237 static ssize_t rbd_parent_show(struct device *dev,
3238                              struct device_attribute *attr,
3239                              char *buf)
3240 {
3241         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3242         struct rbd_spec *spec = rbd_dev->parent_spec;
3243         int count;
3244         char *bufp = buf;
3245
3246         if (!spec)
3247                 return sprintf(buf, "(no parent image)\n");
3248
3249         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3250                         (unsigned long long) spec->pool_id, spec->pool_name);
3251         if (count < 0)
3252                 return count;
3253         bufp += count;
3254
3255         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3256                         spec->image_name ? spec->image_name : "(unknown)");
3257         if (count < 0)
3258                 return count;
3259         bufp += count;
3260
3261         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3262                         (unsigned long long) spec->snap_id, spec->snap_name);
3263         if (count < 0)
3264                 return count;
3265         bufp += count;
3266
3267         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3268         if (count < 0)
3269                 return count;
3270         bufp += count;
3271
3272         return (ssize_t) (bufp - buf);
3273 }
3274
3275 static ssize_t rbd_image_refresh(struct device *dev,
3276                                  struct device_attribute *attr,
3277                                  const char *buf,
3278                                  size_t size)
3279 {
3280         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3281         int ret;
3282
3283         ret = rbd_dev_refresh(rbd_dev, NULL);
3284
3285         return ret < 0 ? ret : size;
3286 }
3287
3288 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3289 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3290 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3291 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3292 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3293 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3294 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3295 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3296 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3297 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3298 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3299
3300 static struct attribute *rbd_attrs[] = {
3301         &dev_attr_size.attr,
3302         &dev_attr_features.attr,
3303         &dev_attr_major.attr,
3304         &dev_attr_client_id.attr,
3305         &dev_attr_pool.attr,
3306         &dev_attr_pool_id.attr,
3307         &dev_attr_name.attr,
3308         &dev_attr_image_id.attr,
3309         &dev_attr_current_snap.attr,
3310         &dev_attr_parent.attr,
3311         &dev_attr_refresh.attr,
3312         NULL
3313 };
3314
3315 static struct attribute_group rbd_attr_group = {
3316         .attrs = rbd_attrs,
3317 };
3318
3319 static const struct attribute_group *rbd_attr_groups[] = {
3320         &rbd_attr_group,
3321         NULL
3322 };
3323
3324 static void rbd_sysfs_dev_release(struct device *dev)
3325 {
3326 }
3327
3328 static struct device_type rbd_device_type = {
3329         .name           = "rbd",
3330         .groups         = rbd_attr_groups,
3331         .release        = rbd_sysfs_dev_release,
3332 };
3333
3334 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3335 {
3336         kref_get(&spec->kref);
3337
3338         return spec;
3339 }
3340
3341 static void rbd_spec_free(struct kref *kref);
3342 static void rbd_spec_put(struct rbd_spec *spec)
3343 {
3344         if (spec)
3345                 kref_put(&spec->kref, rbd_spec_free);
3346 }
3347
3348 static struct rbd_spec *rbd_spec_alloc(void)
3349 {
3350         struct rbd_spec *spec;
3351
3352         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3353         if (!spec)
3354                 return NULL;
3355         kref_init(&spec->kref);
3356
3357         return spec;
3358 }
3359
3360 static void rbd_spec_free(struct kref *kref)
3361 {
3362         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3363
3364         kfree(spec->pool_name);
3365         kfree(spec->image_id);
3366         kfree(spec->image_name);
3367         kfree(spec->snap_name);
3368         kfree(spec);
3369 }
3370
3371 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3372                                 struct rbd_spec *spec)
3373 {
3374         struct rbd_device *rbd_dev;
3375
3376         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3377         if (!rbd_dev)
3378                 return NULL;
3379
3380         spin_lock_init(&rbd_dev->lock);
3381         rbd_dev->flags = 0;
3382         INIT_LIST_HEAD(&rbd_dev->node);
3383         INIT_LIST_HEAD(&rbd_dev->snaps);
3384         init_rwsem(&rbd_dev->header_rwsem);
3385
3386         rbd_dev->spec = spec;
3387         rbd_dev->rbd_client = rbdc;
3388
3389         /* Initialize the layout used for all rbd requests */
3390
3391         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3392         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3393         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3394         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3395
3396         return rbd_dev;
3397 }
3398
3399 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3400 {
3401         rbd_put_client(rbd_dev->rbd_client);
3402         rbd_spec_put(rbd_dev->spec);
3403         kfree(rbd_dev);
3404 }
3405
3406 static void rbd_snap_destroy(struct rbd_snap *snap)
3407 {
3408         kfree(snap->name);
3409         kfree(snap);
3410 }
3411
3412 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3413                                                 const char *snap_name,
3414                                                 u64 snap_id, u64 snap_size,
3415                                                 u64 snap_features)
3416 {
3417         struct rbd_snap *snap;
3418
3419         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3420         if (!snap)
3421                 return ERR_PTR(-ENOMEM);
3422
3423         snap->name = snap_name;
3424         snap->id = snap_id;
3425         snap->size = snap_size;
3426         snap->features = snap_features;
3427
3428         return snap;
3429 }
3430
3431 /*
3432  * Returns a dynamically-allocated snapshot name if successful, or a
3433  * pointer-coded error otherwise.
3434  */
3435 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3436                 u64 *snap_size, u64 *snap_features)
3437 {
3438         char *snap_name;
3439         int i;
3440
3441         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3442
3443         /* Skip over names until we find the one we are looking for */
3444
3445         snap_name = rbd_dev->header.snap_names;
3446         for (i = 0; i < which; i++)
3447                 snap_name += strlen(snap_name) + 1;
3448
3449         snap_name = kstrdup(snap_name, GFP_KERNEL);
3450         if (!snap_name)
3451                 return ERR_PTR(-ENOMEM);
3452
3453         *snap_size = rbd_dev->header.snap_sizes[which];
3454         *snap_features = 0;     /* No features for v1 */
3455
3456         return snap_name;
3457 }
3458
3459 /*
3460  * Get the size and object order for an image snapshot, or if
3461  * snap_id is CEPH_NOSNAP, gets this information for the base
3462  * image.
3463  */
3464 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3465                                 u8 *order, u64 *snap_size)
3466 {
3467         __le64 snapid = cpu_to_le64(snap_id);
3468         int ret;
3469         struct {
3470                 u8 order;
3471                 __le64 size;
3472         } __attribute__ ((packed)) size_buf = { 0 };
3473
3474         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3475                                 "rbd", "get_size",
3476                                 &snapid, sizeof (snapid),
3477                                 &size_buf, sizeof (size_buf), NULL);
3478         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3479         if (ret < 0)
3480                 return ret;
3481         if (ret < sizeof (size_buf))
3482                 return -ERANGE;
3483
3484         if (order)
3485                 *order = size_buf.order;
3486         *snap_size = le64_to_cpu(size_buf.size);
3487
3488         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3489                 (unsigned long long)snap_id, (unsigned int)*order,
3490                 (unsigned long long)*snap_size);
3491
3492         return 0;
3493 }
3494
3495 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3496 {
3497         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3498                                         &rbd_dev->header.obj_order,
3499                                         &rbd_dev->header.image_size);
3500 }
3501
3502 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3503 {
3504         void *reply_buf;
3505         int ret;
3506         void *p;
3507
3508         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3509         if (!reply_buf)
3510                 return -ENOMEM;
3511
3512         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3513                                 "rbd", "get_object_prefix", NULL, 0,
3514                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3515         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3516         if (ret < 0)
3517                 goto out;
3518
3519         p = reply_buf;
3520         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3521                                                 p + ret, NULL, GFP_NOIO);
3522         ret = 0;
3523
3524         if (IS_ERR(rbd_dev->header.object_prefix)) {
3525                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3526                 rbd_dev->header.object_prefix = NULL;
3527         } else {
3528                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3529         }
3530 out:
3531         kfree(reply_buf);
3532
3533         return ret;
3534 }
3535
3536 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3537                 u64 *snap_features)
3538 {
3539         __le64 snapid = cpu_to_le64(snap_id);
3540         struct {
3541                 __le64 features;
3542                 __le64 incompat;
3543         } __attribute__ ((packed)) features_buf = { 0 };
3544         u64 incompat;
3545         int ret;
3546
3547         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3548                                 "rbd", "get_features",
3549                                 &snapid, sizeof (snapid),
3550                                 &features_buf, sizeof (features_buf), NULL);
3551         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3552         if (ret < 0)
3553                 return ret;
3554         if (ret < sizeof (features_buf))
3555                 return -ERANGE;
3556
3557         incompat = le64_to_cpu(features_buf.incompat);
3558         if (incompat & ~RBD_FEATURES_SUPPORTED)
3559                 return -ENXIO;
3560
3561         *snap_features = le64_to_cpu(features_buf.features);
3562
3563         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3564                 (unsigned long long)snap_id,
3565                 (unsigned long long)*snap_features,
3566                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3567
3568         return 0;
3569 }
3570
3571 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3572 {
3573         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3574                                                 &rbd_dev->header.features);
3575 }
3576
3577 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3578 {
3579         struct rbd_spec *parent_spec;
3580         size_t size;
3581         void *reply_buf = NULL;
3582         __le64 snapid;
3583         void *p;
3584         void *end;
3585         char *image_id;
3586         u64 overlap;
3587         int ret;
3588
3589         parent_spec = rbd_spec_alloc();
3590         if (!parent_spec)
3591                 return -ENOMEM;
3592
3593         size = sizeof (__le64) +                                /* pool_id */
3594                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3595                 sizeof (__le64) +                               /* snap_id */
3596                 sizeof (__le64);                                /* overlap */
3597         reply_buf = kmalloc(size, GFP_KERNEL);
3598         if (!reply_buf) {
3599                 ret = -ENOMEM;
3600                 goto out_err;
3601         }
3602
3603         snapid = cpu_to_le64(CEPH_NOSNAP);
3604         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3605                                 "rbd", "get_parent",
3606                                 &snapid, sizeof (snapid),
3607                                 reply_buf, size, NULL);
3608         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3609         if (ret < 0)
3610                 goto out_err;
3611
3612         p = reply_buf;
3613         end = reply_buf + ret;
3614         ret = -ERANGE;
3615         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3616         if (parent_spec->pool_id == CEPH_NOPOOL)
3617                 goto out;       /* No parent?  No problem. */
3618
3619         /* The ceph file layout needs to fit pool id in 32 bits */
3620
3621         ret = -EIO;
3622         if (parent_spec->pool_id > (u64)U32_MAX) {
3623                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3624                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3625                 goto out_err;
3626         }
3627
3628         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3629         if (IS_ERR(image_id)) {
3630                 ret = PTR_ERR(image_id);
3631                 goto out_err;
3632         }
3633         parent_spec->image_id = image_id;
3634         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3635         ceph_decode_64_safe(&p, end, overlap, out_err);
3636
3637         rbd_dev->parent_overlap = overlap;
3638         rbd_dev->parent_spec = parent_spec;
3639         parent_spec = NULL;     /* rbd_dev now owns this */
3640 out:
3641         ret = 0;
3642 out_err:
3643         kfree(reply_buf);
3644         rbd_spec_put(parent_spec);
3645
3646         return ret;
3647 }
3648
3649 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3650 {
3651         struct {
3652                 __le64 stripe_unit;
3653                 __le64 stripe_count;
3654         } __attribute__ ((packed)) striping_info_buf = { 0 };
3655         size_t size = sizeof (striping_info_buf);
3656         void *p;
3657         u64 obj_size;
3658         u64 stripe_unit;
3659         u64 stripe_count;
3660         int ret;
3661
3662         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3663                                 "rbd", "get_stripe_unit_count", NULL, 0,
3664                                 (char *)&striping_info_buf, size, NULL);
3665         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3666         if (ret < 0)
3667                 return ret;
3668         if (ret < size)
3669                 return -ERANGE;
3670
3671         /*
3672          * We don't actually support the "fancy striping" feature
3673          * (STRIPINGV2) yet, but if the striping sizes are the
3674          * defaults the behavior is the same as before.  So find
3675          * out, and only fail if the image has non-default values.
3676          */
3677         ret = -EINVAL;
3678         obj_size = (u64)1 << rbd_dev->header.obj_order;
3679         p = &striping_info_buf;
3680         stripe_unit = ceph_decode_64(&p);
3681         if (stripe_unit != obj_size) {
3682                 rbd_warn(rbd_dev, "unsupported stripe unit "
3683                                 "(got %llu want %llu)",
3684                                 stripe_unit, obj_size);
3685                 return -EINVAL;
3686         }
3687         stripe_count = ceph_decode_64(&p);
3688         if (stripe_count != 1) {
3689                 rbd_warn(rbd_dev, "unsupported stripe count "
3690                                 "(got %llu want 1)", stripe_count);
3691                 return -EINVAL;
3692         }
3693         rbd_dev->header.stripe_unit = stripe_unit;
3694         rbd_dev->header.stripe_count = stripe_count;
3695
3696         return 0;
3697 }
3698
3699 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3700 {
3701         size_t image_id_size;
3702         char *image_id;
3703         void *p;
3704         void *end;
3705         size_t size;
3706         void *reply_buf = NULL;
3707         size_t len = 0;
3708         char *image_name = NULL;
3709         int ret;
3710
3711         rbd_assert(!rbd_dev->spec->image_name);
3712
3713         len = strlen(rbd_dev->spec->image_id);
3714         image_id_size = sizeof (__le32) + len;
3715         image_id = kmalloc(image_id_size, GFP_KERNEL);
3716         if (!image_id)
3717                 return NULL;
3718
3719         p = image_id;
3720         end = image_id + image_id_size;
3721         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3722
3723         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3724         reply_buf = kmalloc(size, GFP_KERNEL);
3725         if (!reply_buf)
3726                 goto out;
3727
3728         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3729                                 "rbd", "dir_get_name",
3730                                 image_id, image_id_size,
3731                                 reply_buf, size, NULL);
3732         if (ret < 0)
3733                 goto out;
3734         p = reply_buf;
3735         end = reply_buf + ret;
3736
3737         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3738         if (IS_ERR(image_name))
3739                 image_name = NULL;
3740         else
3741                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3742 out:
3743         kfree(reply_buf);
3744         kfree(image_id);
3745
3746         return image_name;
3747 }
3748
3749 /*
3750  * When an rbd image has a parent image, it is identified by the
3751  * pool, image, and snapshot ids (not names).  This function fills
3752  * in the names for those ids.  (It's OK if we can't figure out the
3753  * name for an image id, but the pool and snapshot ids should always
3754  * exist and have names.)  All names in an rbd spec are dynamically
3755  * allocated.
3756  *
3757  * When an image being mapped (not a parent) is probed, we have the
3758  * pool name and pool id, image name and image id, and the snapshot
3759  * name.  The only thing we're missing is the snapshot id.
3760  *
3761  * The set of snapshots for an image is not known until they have
3762  * been read by rbd_dev_snaps_update(), so we can't completely fill
3763  * in this information until after that has been called.
3764  */
3765 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3766 {
3767         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3768         struct rbd_spec *spec = rbd_dev->spec;
3769         const char *pool_name;
3770         const char *image_name;
3771         const char *snap_name;
3772         int ret;
3773
3774         /*
3775          * An image being mapped will have the pool name (etc.), but
3776          * we need to look up the snapshot id.
3777          */
3778         if (spec->pool_name) {
3779                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3780                         struct rbd_snap *snap;
3781
3782                         snap = snap_by_name(rbd_dev, spec->snap_name);
3783                         if (!snap)
3784                                 return -ENOENT;
3785                         spec->snap_id = snap->id;
3786                 } else {
3787                         spec->snap_id = CEPH_NOSNAP;
3788                 }
3789
3790                 return 0;
3791         }
3792
3793         /* Get the pool name; we have to make our own copy of this */
3794
3795         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3796         if (!pool_name) {
3797                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3798                 return -EIO;
3799         }
3800         pool_name = kstrdup(pool_name, GFP_KERNEL);
3801         if (!pool_name)
3802                 return -ENOMEM;
3803
3804         /* Fetch the image name; tolerate failure here */
3805
3806         image_name = rbd_dev_image_name(rbd_dev);
3807         if (!image_name)
3808                 rbd_warn(rbd_dev, "unable to get image name");
3809
3810         /* Look up the snapshot name, and make a copy */
3811
3812         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3813         if (!snap_name) {
3814                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3815                 ret = -EIO;
3816                 goto out_err;
3817         }
3818         snap_name = kstrdup(snap_name, GFP_KERNEL);
3819         if (!snap_name) {
3820                 ret = -ENOMEM;
3821                 goto out_err;
3822         }
3823
3824         spec->pool_name = pool_name;
3825         spec->image_name = image_name;
3826         spec->snap_name = snap_name;
3827
3828         return 0;
3829 out_err:
3830         kfree(image_name);
3831         kfree(pool_name);
3832
3833         return ret;
3834 }
3835
3836 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3837 {
3838         size_t size;
3839         int ret;
3840         void *reply_buf;
3841         void *p;
3842         void *end;
3843         u64 seq;
3844         u32 snap_count;
3845         struct ceph_snap_context *snapc;
3846         u32 i;
3847
3848         /*
3849          * We'll need room for the seq value (maximum snapshot id),
3850          * snapshot count, and array of that many snapshot ids.
3851          * For now we have a fixed upper limit on the number we're
3852          * prepared to receive.
3853          */
3854         size = sizeof (__le64) + sizeof (__le32) +
3855                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3856         reply_buf = kzalloc(size, GFP_KERNEL);
3857         if (!reply_buf)
3858                 return -ENOMEM;
3859
3860         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3861                                 "rbd", "get_snapcontext", NULL, 0,
3862                                 reply_buf, size, ver);
3863         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3864         if (ret < 0)
3865                 goto out;
3866
3867         p = reply_buf;
3868         end = reply_buf + ret;
3869         ret = -ERANGE;
3870         ceph_decode_64_safe(&p, end, seq, out);
3871         ceph_decode_32_safe(&p, end, snap_count, out);
3872
3873         /*
3874          * Make sure the reported number of snapshot ids wouldn't go
3875          * beyond the end of our buffer.  But before checking that,
3876          * make sure the computed size of the snapshot context we
3877          * allocate is representable in a size_t.
3878          */
3879         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3880                                  / sizeof (u64)) {
3881                 ret = -EINVAL;
3882                 goto out;
3883         }
3884         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3885                 goto out;
3886         ret = 0;
3887
3888         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3889         if (!snapc) {
3890                 ret = -ENOMEM;
3891                 goto out;
3892         }
3893         snapc->seq = seq;
3894         for (i = 0; i < snap_count; i++)
3895                 snapc->snaps[i] = ceph_decode_64(&p);
3896
3897         rbd_dev->header.snapc = snapc;
3898
3899         dout("  snap context seq = %llu, snap_count = %u\n",
3900                 (unsigned long long)seq, (unsigned int)snap_count);
3901 out:
3902         kfree(reply_buf);
3903
3904         return ret;
3905 }
3906
3907 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3908 {
3909         size_t size;
3910         void *reply_buf;
3911         __le64 snap_id;
3912         int ret;
3913         void *p;
3914         void *end;
3915         char *snap_name;
3916
3917         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3918         reply_buf = kmalloc(size, GFP_KERNEL);
3919         if (!reply_buf)
3920                 return ERR_PTR(-ENOMEM);
3921
3922         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3923         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3924         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3925                                 "rbd", "get_snapshot_name",
3926                                 &snap_id, sizeof (snap_id),
3927                                 reply_buf, size, NULL);
3928         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3929         if (ret < 0) {
3930                 snap_name = ERR_PTR(ret);
3931                 goto out;
3932         }
3933
3934         p = reply_buf;
3935         end = reply_buf + ret;
3936         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3937         if (IS_ERR(snap_name))
3938                 goto out;
3939
3940         dout("  snap_id 0x%016llx snap_name = %s\n",
3941                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3942 out:
3943         kfree(reply_buf);
3944
3945         return snap_name;
3946 }
3947
3948 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3949                 u64 *snap_size, u64 *snap_features)
3950 {
3951         u64 snap_id;
3952         u64 size;
3953         u64 features;
3954         char *snap_name;
3955         int ret;
3956
3957         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3958         snap_id = rbd_dev->header.snapc->snaps[which];
3959         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3960         if (ret)
3961                 goto out_err;
3962
3963         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3964         if (ret)
3965                 goto out_err;
3966
3967         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3968         if (!IS_ERR(snap_name)) {
3969                 *snap_size = size;
3970                 *snap_features = features;
3971         }
3972
3973         return snap_name;
3974 out_err:
3975         return ERR_PTR(ret);
3976 }
3977
3978 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3979                 u64 *snap_size, u64 *snap_features)
3980 {
3981         if (rbd_dev->image_format == 1)
3982                 return rbd_dev_v1_snap_info(rbd_dev, which,
3983                                         snap_size, snap_features);
3984         if (rbd_dev->image_format == 2)
3985                 return rbd_dev_v2_snap_info(rbd_dev, which,
3986                                         snap_size, snap_features);
3987         return ERR_PTR(-EINVAL);
3988 }
3989
3990 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3991 {
3992         int ret;
3993
3994         down_write(&rbd_dev->header_rwsem);
3995
3996         ret = rbd_dev_v2_image_size(rbd_dev);
3997         if (ret)
3998                 goto out;
3999         rbd_update_mapping_size(rbd_dev);
4000
4001         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4002         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4003         if (ret)
4004                 goto out;
4005         ret = rbd_dev_snaps_update(rbd_dev);
4006         dout("rbd_dev_snaps_update returned %d\n", ret);
4007         if (ret)
4008                 goto out;
4009 out:
4010         up_write(&rbd_dev->header_rwsem);
4011
4012         return ret;
4013 }
4014
4015 /*
4016  * Scan the rbd device's current snapshot list and compare it to the
4017  * newly-received snapshot context.  Remove any existing snapshots
4018  * not present in the new snapshot context.  Add a new snapshot for
4019  * any snaphots in the snapshot context not in the current list.
4020  * And verify there are no changes to snapshots we already know
4021  * about.
4022  *
4023  * Assumes the snapshots in the snapshot context are sorted by
4024  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4025  * are also maintained in that order.)
4026  *
4027  * Note that any error occurs while updating the snapshot list
4028  * aborts the update, and the entire list is cleared.  The snapshot
4029  * list becomes inconsistent at that point anyway, so it might as
4030  * well be empty.
4031  */
4032 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4033 {
4034         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4035         const u32 snap_count = snapc->num_snaps;
4036         struct list_head *head = &rbd_dev->snaps;
4037         struct list_head *links = head->next;
4038         u32 index = 0;
4039         int ret = 0;
4040
4041         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4042         while (index < snap_count || links != head) {
4043                 u64 snap_id;
4044                 struct rbd_snap *snap;
4045                 char *snap_name;
4046                 u64 snap_size = 0;
4047                 u64 snap_features = 0;
4048
4049                 snap_id = index < snap_count ? snapc->snaps[index]
4050                                              : CEPH_NOSNAP;
4051                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4052                                      : NULL;
4053                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4054
4055                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4056                         struct list_head *next = links->next;
4057
4058                         /*
4059                          * A previously-existing snapshot is not in
4060                          * the new snap context.
4061                          *
4062                          * If the now-missing snapshot is the one
4063                          * the image represents, clear its existence
4064                          * flag so we can avoid sending any more
4065                          * requests to it.
4066                          */
4067                         if (rbd_dev->spec->snap_id == snap->id)
4068                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4069                         dout("removing %ssnap id %llu\n",
4070                                 rbd_dev->spec->snap_id == snap->id ?
4071                                                         "mapped " : "",
4072                                 (unsigned long long)snap->id);
4073
4074                         list_del(&snap->node);
4075                         rbd_snap_destroy(snap);
4076
4077                         /* Done with this list entry; advance */
4078
4079                         links = next;
4080                         continue;
4081                 }
4082
4083                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4084                                         &snap_size, &snap_features);
4085                 if (IS_ERR(snap_name)) {
4086                         ret = PTR_ERR(snap_name);
4087                         dout("failed to get snap info, error %d\n", ret);
4088                         goto out_err;
4089                 }
4090
4091                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4092                         (unsigned long long)snap_id);
4093                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4094                         struct rbd_snap *new_snap;
4095
4096                         /* We haven't seen this snapshot before */
4097
4098                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4099                                         snap_id, snap_size, snap_features);
4100                         if (IS_ERR(new_snap)) {
4101                                 ret = PTR_ERR(new_snap);
4102                                 dout("  failed to add dev, error %d\n", ret);
4103                                 goto out_err;
4104                         }
4105
4106                         /* New goes before existing, or at end of list */
4107
4108                         dout("  added dev%s\n", snap ? "" : " at end\n");
4109                         if (snap)
4110                                 list_add_tail(&new_snap->node, &snap->node);
4111                         else
4112                                 list_add_tail(&new_snap->node, head);
4113                 } else {
4114                         /* Already have this one */
4115
4116                         dout("  already present\n");
4117
4118                         rbd_assert(snap->size == snap_size);
4119                         rbd_assert(!strcmp(snap->name, snap_name));
4120                         rbd_assert(snap->features == snap_features);
4121
4122                         /* Done with this list entry; advance */
4123
4124                         links = links->next;
4125                 }
4126
4127                 /* Advance to the next entry in the snapshot context */
4128
4129                 index++;
4130         }
4131         dout("%s: done\n", __func__);
4132
4133         return 0;
4134 out_err:
4135         rbd_remove_all_snaps(rbd_dev);
4136
4137         return ret;
4138 }
4139
4140 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4141 {
4142         struct device *dev;
4143         int ret;
4144
4145         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4146
4147         dev = &rbd_dev->dev;
4148         dev->bus = &rbd_bus_type;
4149         dev->type = &rbd_device_type;
4150         dev->parent = &rbd_root_dev;
4151         dev->release = rbd_dev_device_release;
4152         dev_set_name(dev, "%d", rbd_dev->dev_id);
4153         ret = device_register(dev);
4154
4155         mutex_unlock(&ctl_mutex);
4156
4157         return ret;
4158 }
4159
4160 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4161 {
4162         device_unregister(&rbd_dev->dev);
4163 }
4164
4165 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4166
4167 /*
4168  * Get a unique rbd identifier for the given new rbd_dev, and add
4169  * the rbd_dev to the global list.  The minimum rbd id is 1.
4170  */
4171 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4172 {
4173         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4174
4175         spin_lock(&rbd_dev_list_lock);
4176         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4177         spin_unlock(&rbd_dev_list_lock);
4178         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4179                 (unsigned long long) rbd_dev->dev_id);
4180 }
4181
4182 /*
4183  * Remove an rbd_dev from the global list, and record that its
4184  * identifier is no longer in use.
4185  */
4186 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4187 {
4188         struct list_head *tmp;
4189         int rbd_id = rbd_dev->dev_id;
4190         int max_id;
4191
4192         rbd_assert(rbd_id > 0);
4193
4194         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4195                 (unsigned long long) rbd_dev->dev_id);
4196         spin_lock(&rbd_dev_list_lock);
4197         list_del_init(&rbd_dev->node);
4198
4199         /*
4200          * If the id being "put" is not the current maximum, there
4201          * is nothing special we need to do.
4202          */
4203         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4204                 spin_unlock(&rbd_dev_list_lock);
4205                 return;
4206         }
4207
4208         /*
4209          * We need to update the current maximum id.  Search the
4210          * list to find out what it is.  We're more likely to find
4211          * the maximum at the end, so search the list backward.
4212          */
4213         max_id = 0;
4214         list_for_each_prev(tmp, &rbd_dev_list) {
4215                 struct rbd_device *rbd_dev;
4216
4217                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4218                 if (rbd_dev->dev_id > max_id)
4219                         max_id = rbd_dev->dev_id;
4220         }
4221         spin_unlock(&rbd_dev_list_lock);
4222
4223         /*
4224          * The max id could have been updated by rbd_dev_id_get(), in
4225          * which case it now accurately reflects the new maximum.
4226          * Be careful not to overwrite the maximum value in that
4227          * case.
4228          */
4229         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4230         dout("  max dev id has been reset\n");
4231 }
4232
4233 /*
4234  * Skips over white space at *buf, and updates *buf to point to the
4235  * first found non-space character (if any). Returns the length of
4236  * the token (string of non-white space characters) found.  Note
4237  * that *buf must be terminated with '\0'.
4238  */
4239 static inline size_t next_token(const char **buf)
4240 {
4241         /*
4242         * These are the characters that produce nonzero for
4243         * isspace() in the "C" and "POSIX" locales.
4244         */
4245         const char *spaces = " \f\n\r\t\v";
4246
4247         *buf += strspn(*buf, spaces);   /* Find start of token */
4248
4249         return strcspn(*buf, spaces);   /* Return token length */
4250 }
4251
4252 /*
4253  * Finds the next token in *buf, and if the provided token buffer is
4254  * big enough, copies the found token into it.  The result, if
4255  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4256  * must be terminated with '\0' on entry.
4257  *
4258  * Returns the length of the token found (not including the '\0').
4259  * Return value will be 0 if no token is found, and it will be >=
4260  * token_size if the token would not fit.
4261  *
4262  * The *buf pointer will be updated to point beyond the end of the
4263  * found token.  Note that this occurs even if the token buffer is
4264  * too small to hold it.
4265  */
4266 static inline size_t copy_token(const char **buf,
4267                                 char *token,
4268                                 size_t token_size)
4269 {
4270         size_t len;
4271
4272         len = next_token(buf);
4273         if (len < token_size) {
4274                 memcpy(token, *buf, len);
4275                 *(token + len) = '\0';
4276         }
4277         *buf += len;
4278
4279         return len;
4280 }
4281
4282 /*
4283  * Finds the next token in *buf, dynamically allocates a buffer big
4284  * enough to hold a copy of it, and copies the token into the new
4285  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4286  * that a duplicate buffer is created even for a zero-length token.
4287  *
4288  * Returns a pointer to the newly-allocated duplicate, or a null
4289  * pointer if memory for the duplicate was not available.  If
4290  * the lenp argument is a non-null pointer, the length of the token
4291  * (not including the '\0') is returned in *lenp.
4292  *
4293  * If successful, the *buf pointer will be updated to point beyond
4294  * the end of the found token.
4295  *
4296  * Note: uses GFP_KERNEL for allocation.
4297  */
4298 static inline char *dup_token(const char **buf, size_t *lenp)
4299 {
4300         char *dup;
4301         size_t len;
4302
4303         len = next_token(buf);
4304         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4305         if (!dup)
4306                 return NULL;
4307         *(dup + len) = '\0';
4308         *buf += len;
4309
4310         if (lenp)
4311                 *lenp = len;
4312
4313         return dup;
4314 }
4315
4316 /*
4317  * Parse the options provided for an "rbd add" (i.e., rbd image
4318  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4319  * and the data written is passed here via a NUL-terminated buffer.
4320  * Returns 0 if successful or an error code otherwise.
4321  *
4322  * The information extracted from these options is recorded in
4323  * the other parameters which return dynamically-allocated
4324  * structures:
4325  *  ceph_opts
4326  *      The address of a pointer that will refer to a ceph options
4327  *      structure.  Caller must release the returned pointer using
4328  *      ceph_destroy_options() when it is no longer needed.
4329  *  rbd_opts
4330  *      Address of an rbd options pointer.  Fully initialized by
4331  *      this function; caller must release with kfree().
4332  *  spec
4333  *      Address of an rbd image specification pointer.  Fully
4334  *      initialized by this function based on parsed options.
4335  *      Caller must release with rbd_spec_put().
4336  *
4337  * The options passed take this form:
4338  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4339  * where:
4340  *  <mon_addrs>
4341  *      A comma-separated list of one or more monitor addresses.
4342  *      A monitor address is an ip address, optionally followed
4343  *      by a port number (separated by a colon).
4344  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4345  *  <options>
4346  *      A comma-separated list of ceph and/or rbd options.
4347  *  <pool_name>
4348  *      The name of the rados pool containing the rbd image.
4349  *  <image_name>
4350  *      The name of the image in that pool to map.
4351  *  <snap_id>
4352  *      An optional snapshot id.  If provided, the mapping will
4353  *      present data from the image at the time that snapshot was
4354  *      created.  The image head is used if no snapshot id is
4355  *      provided.  Snapshot mappings are always read-only.
4356  */
4357 static int rbd_add_parse_args(const char *buf,
4358                                 struct ceph_options **ceph_opts,
4359                                 struct rbd_options **opts,
4360                                 struct rbd_spec **rbd_spec)
4361 {
4362         size_t len;
4363         char *options;
4364         const char *mon_addrs;
4365         char *snap_name;
4366         size_t mon_addrs_size;
4367         struct rbd_spec *spec = NULL;
4368         struct rbd_options *rbd_opts = NULL;
4369         struct ceph_options *copts;
4370         int ret;
4371
4372         /* The first four tokens are required */
4373
4374         len = next_token(&buf);
4375         if (!len) {
4376                 rbd_warn(NULL, "no monitor address(es) provided");
4377                 return -EINVAL;
4378         }
4379         mon_addrs = buf;
4380         mon_addrs_size = len + 1;
4381         buf += len;
4382
4383         ret = -EINVAL;
4384         options = dup_token(&buf, NULL);
4385         if (!options)
4386                 return -ENOMEM;
4387         if (!*options) {
4388                 rbd_warn(NULL, "no options provided");
4389                 goto out_err;
4390         }
4391
4392         spec = rbd_spec_alloc();
4393         if (!spec)
4394                 goto out_mem;
4395
4396         spec->pool_name = dup_token(&buf, NULL);
4397         if (!spec->pool_name)
4398                 goto out_mem;
4399         if (!*spec->pool_name) {
4400                 rbd_warn(NULL, "no pool name provided");
4401                 goto out_err;
4402         }
4403
4404         spec->image_name = dup_token(&buf, NULL);
4405         if (!spec->image_name)
4406                 goto out_mem;
4407         if (!*spec->image_name) {
4408                 rbd_warn(NULL, "no image name provided");
4409                 goto out_err;
4410         }
4411
4412         /*
4413          * Snapshot name is optional; default is to use "-"
4414          * (indicating the head/no snapshot).
4415          */
4416         len = next_token(&buf);
4417         if (!len) {
4418                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4419                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4420         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4421                 ret = -ENAMETOOLONG;
4422                 goto out_err;
4423         }
4424         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4425         if (!snap_name)
4426                 goto out_mem;
4427         *(snap_name + len) = '\0';
4428         spec->snap_name = snap_name;
4429
4430         /* Initialize all rbd options to the defaults */
4431
4432         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4433         if (!rbd_opts)
4434                 goto out_mem;
4435
4436         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4437
4438         copts = ceph_parse_options(options, mon_addrs,
4439                                         mon_addrs + mon_addrs_size - 1,
4440                                         parse_rbd_opts_token, rbd_opts);
4441         if (IS_ERR(copts)) {
4442                 ret = PTR_ERR(copts);
4443                 goto out_err;
4444         }
4445         kfree(options);
4446
4447         *ceph_opts = copts;
4448         *opts = rbd_opts;
4449         *rbd_spec = spec;
4450
4451         return 0;
4452 out_mem:
4453         ret = -ENOMEM;
4454 out_err:
4455         kfree(rbd_opts);
4456         rbd_spec_put(spec);
4457         kfree(options);
4458
4459         return ret;
4460 }
4461
4462 /*
4463  * An rbd format 2 image has a unique identifier, distinct from the
4464  * name given to it by the user.  Internally, that identifier is
4465  * what's used to specify the names of objects related to the image.
4466  *
4467  * A special "rbd id" object is used to map an rbd image name to its
4468  * id.  If that object doesn't exist, then there is no v2 rbd image
4469  * with the supplied name.
4470  *
4471  * This function will record the given rbd_dev's image_id field if
4472  * it can be determined, and in that case will return 0.  If any
4473  * errors occur a negative errno will be returned and the rbd_dev's
4474  * image_id field will be unchanged (and should be NULL).
4475  */
4476 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4477 {
4478         int ret;
4479         size_t size;
4480         char *object_name;
4481         void *response;
4482         char *image_id;
4483
4484         /*
4485          * When probing a parent image, the image id is already
4486          * known (and the image name likely is not).  There's no
4487          * need to fetch the image id again in this case.  We
4488          * do still need to set the image format though.
4489          */
4490         if (rbd_dev->spec->image_id) {
4491                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4492
4493                 return 0;
4494         }
4495
4496         /*
4497          * First, see if the format 2 image id file exists, and if
4498          * so, get the image's persistent id from it.
4499          */
4500         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4501         object_name = kmalloc(size, GFP_NOIO);
4502         if (!object_name)
4503                 return -ENOMEM;
4504         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4505         dout("rbd id object name is %s\n", object_name);
4506
4507         /* Response will be an encoded string, which includes a length */
4508
4509         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4510         response = kzalloc(size, GFP_NOIO);
4511         if (!response) {
4512                 ret = -ENOMEM;
4513                 goto out;
4514         }
4515
4516         /* If it doesn't exist we'll assume it's a format 1 image */
4517
4518         ret = rbd_obj_method_sync(rbd_dev, object_name,
4519                                 "rbd", "get_id", NULL, 0,
4520                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4521         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4522         if (ret == -ENOENT) {
4523                 image_id = kstrdup("", GFP_KERNEL);
4524                 ret = image_id ? 0 : -ENOMEM;
4525                 if (!ret)
4526                         rbd_dev->image_format = 1;
4527         } else if (ret > sizeof (__le32)) {
4528                 void *p = response;
4529
4530                 image_id = ceph_extract_encoded_string(&p, p + ret,
4531                                                 NULL, GFP_NOIO);
4532                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4533                 if (!ret)
4534                         rbd_dev->image_format = 2;
4535         } else {
4536                 ret = -EINVAL;
4537         }
4538
4539         if (!ret) {
4540                 rbd_dev->spec->image_id = image_id;
4541                 dout("image_id is %s\n", image_id);
4542         }
4543 out:
4544         kfree(response);
4545         kfree(object_name);
4546
4547         return ret;
4548 }
4549
4550 /* Undo whatever state changes are made by v1 or v2 image probe */
4551
4552 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4553 {
4554         struct rbd_image_header *header;
4555
4556         rbd_dev_remove_parent(rbd_dev);
4557         rbd_spec_put(rbd_dev->parent_spec);
4558         rbd_dev->parent_spec = NULL;
4559         rbd_dev->parent_overlap = 0;
4560
4561         /* Free dynamic fields from the header, then zero it out */
4562
4563         header = &rbd_dev->header;
4564         ceph_put_snap_context(header->snapc);
4565         kfree(header->snap_sizes);
4566         kfree(header->snap_names);
4567         kfree(header->object_prefix);
4568         memset(header, 0, sizeof (*header));
4569 }
4570
4571 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4572 {
4573         int ret;
4574
4575         /* Populate rbd image metadata */
4576
4577         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4578         if (ret < 0)
4579                 goto out_err;
4580
4581         /* Version 1 images have no parent (no layering) */
4582
4583         rbd_dev->parent_spec = NULL;
4584         rbd_dev->parent_overlap = 0;
4585
4586         dout("discovered version 1 image, header name is %s\n",
4587                 rbd_dev->header_name);
4588
4589         return 0;
4590
4591 out_err:
4592         kfree(rbd_dev->header_name);
4593         rbd_dev->header_name = NULL;
4594         kfree(rbd_dev->spec->image_id);
4595         rbd_dev->spec->image_id = NULL;
4596
4597         return ret;
4598 }
4599
4600 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4601 {
4602         int ret;
4603         u64 ver = 0;
4604
4605         ret = rbd_dev_v2_image_size(rbd_dev);
4606         if (ret)
4607                 goto out_err;
4608
4609         /* Get the object prefix (a.k.a. block_name) for the image */
4610
4611         ret = rbd_dev_v2_object_prefix(rbd_dev);
4612         if (ret)
4613                 goto out_err;
4614
4615         /* Get the and check features for the image */
4616
4617         ret = rbd_dev_v2_features(rbd_dev);
4618         if (ret)
4619                 goto out_err;
4620
4621         /* If the image supports layering, get the parent info */
4622
4623         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4624                 ret = rbd_dev_v2_parent_info(rbd_dev);
4625                 if (ret)
4626                         goto out_err;
4627                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4628                                         "layered rbd images is EXPERIMENTAL!");
4629         }
4630
4631         /* If the image supports fancy striping, get its parameters */
4632
4633         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4634                 ret = rbd_dev_v2_striping_info(rbd_dev);
4635                 if (ret < 0)
4636                         goto out_err;
4637         }
4638
4639         /* crypto and compression type aren't (yet) supported for v2 images */
4640
4641         rbd_dev->header.crypt_type = 0;
4642         rbd_dev->header.comp_type = 0;
4643
4644         /* Get the snapshot context, plus the header version */
4645
4646         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4647         if (ret)
4648                 goto out_err;
4649         rbd_dev->header.obj_version = ver;
4650
4651         dout("discovered version 2 image, header name is %s\n",
4652                 rbd_dev->header_name);
4653
4654         return 0;
4655 out_err:
4656         rbd_dev->parent_overlap = 0;
4657         rbd_spec_put(rbd_dev->parent_spec);
4658         rbd_dev->parent_spec = NULL;
4659         kfree(rbd_dev->header_name);
4660         rbd_dev->header_name = NULL;
4661         kfree(rbd_dev->header.object_prefix);
4662         rbd_dev->header.object_prefix = NULL;
4663
4664         return ret;
4665 }
4666
4667 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4668 {
4669         struct rbd_device *parent = NULL;
4670         struct rbd_spec *parent_spec;
4671         struct rbd_client *rbdc;
4672         int ret;
4673
4674         if (!rbd_dev->parent_spec)
4675                 return 0;
4676         /*
4677          * We need to pass a reference to the client and the parent
4678          * spec when creating the parent rbd_dev.  Images related by
4679          * parent/child relationships always share both.
4680          */
4681         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4682         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4683
4684         ret = -ENOMEM;
4685         parent = rbd_dev_create(rbdc, parent_spec);
4686         if (!parent)
4687                 goto out_err;
4688
4689         ret = rbd_dev_image_probe(parent);
4690         if (ret < 0)
4691                 goto out_err;
4692         rbd_dev->parent = parent;
4693
4694         return 0;
4695 out_err:
4696         if (parent) {
4697                 rbd_spec_put(rbd_dev->parent_spec);
4698                 kfree(rbd_dev->header_name);
4699                 rbd_dev_destroy(parent);
4700         } else {
4701                 rbd_put_client(rbdc);
4702                 rbd_spec_put(parent_spec);
4703         }
4704
4705         return ret;
4706 }
4707
4708 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4709 {
4710         int ret;
4711
4712         ret = rbd_dev_mapping_set(rbd_dev);
4713         if (ret)
4714                 return ret;
4715
4716         /* generate unique id: find highest unique id, add one */
4717         rbd_dev_id_get(rbd_dev);
4718
4719         /* Fill in the device name, now that we have its id. */
4720         BUILD_BUG_ON(DEV_NAME_LEN
4721                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4722         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4723
4724         /* Get our block major device number. */
4725
4726         ret = register_blkdev(0, rbd_dev->name);
4727         if (ret < 0)
4728                 goto err_out_id;
4729         rbd_dev->major = ret;
4730
4731         /* Set up the blkdev mapping. */
4732
4733         ret = rbd_init_disk(rbd_dev);
4734         if (ret)
4735                 goto err_out_blkdev;
4736
4737         ret = rbd_bus_add_dev(rbd_dev);
4738         if (ret)
4739                 goto err_out_disk;
4740
4741         /* Everything's ready.  Announce the disk to the world. */
4742
4743         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4744         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4745         add_disk(rbd_dev->disk);
4746
4747         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4748                 (unsigned long long) rbd_dev->mapping.size);
4749
4750         return ret;
4751
4752 err_out_disk:
4753         rbd_free_disk(rbd_dev);
4754 err_out_blkdev:
4755         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4756 err_out_id:
4757         rbd_dev_id_put(rbd_dev);
4758         rbd_dev_mapping_clear(rbd_dev);
4759
4760         return ret;
4761 }
4762
4763 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4764 {
4765         struct rbd_spec *spec = rbd_dev->spec;
4766         size_t size;
4767
4768         /* Record the header object name for this rbd image. */
4769
4770         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4771
4772         if (rbd_dev->image_format == 1)
4773                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4774         else
4775                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4776
4777         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4778         if (!rbd_dev->header_name)
4779                 return -ENOMEM;
4780
4781         if (rbd_dev->image_format == 1)
4782                 sprintf(rbd_dev->header_name, "%s%s",
4783                         spec->image_name, RBD_SUFFIX);
4784         else
4785                 sprintf(rbd_dev->header_name, "%s%s",
4786                         RBD_HEADER_PREFIX, spec->image_id);
4787         return 0;
4788 }
4789
4790 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4791 {
4792         int ret;
4793
4794         rbd_remove_all_snaps(rbd_dev);
4795         rbd_dev_unprobe(rbd_dev);
4796         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4797         if (ret)
4798                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4799         kfree(rbd_dev->header_name);
4800         rbd_dev->header_name = NULL;
4801         rbd_dev->image_format = 0;
4802         kfree(rbd_dev->spec->image_id);
4803         rbd_dev->spec->image_id = NULL;
4804
4805         rbd_dev_destroy(rbd_dev);
4806 }
4807
4808 /*
4809  * Probe for the existence of the header object for the given rbd
4810  * device.  For format 2 images this includes determining the image
4811  * id.
4812  */
4813 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4814 {
4815         int ret;
4816         int tmp;
4817
4818         /*
4819          * Get the id from the image id object.  If it's not a
4820          * format 2 image, we'll get ENOENT back, and we'll assume
4821          * it's a format 1 image.
4822          */
4823         ret = rbd_dev_image_id(rbd_dev);
4824         if (ret)
4825                 return ret;
4826         rbd_assert(rbd_dev->spec->image_id);
4827         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4828
4829         ret = rbd_dev_header_name(rbd_dev);
4830         if (ret)
4831                 goto err_out_format;
4832
4833         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4834         if (ret)
4835                 goto out_header_name;
4836
4837         if (rbd_dev->image_format == 1)
4838                 ret = rbd_dev_v1_probe(rbd_dev);
4839         else
4840                 ret = rbd_dev_v2_probe(rbd_dev);
4841         if (ret)
4842                 goto err_out_watch;
4843
4844         ret = rbd_dev_snaps_update(rbd_dev);
4845         if (ret)
4846                 goto err_out_probe;
4847
4848         ret = rbd_dev_spec_update(rbd_dev);
4849         if (ret)
4850                 goto err_out_snaps;
4851
4852         ret = rbd_dev_probe_parent(rbd_dev);
4853         if (!ret)
4854                 return 0;
4855
4856 err_out_snaps:
4857         rbd_remove_all_snaps(rbd_dev);
4858 err_out_probe:
4859         rbd_dev_unprobe(rbd_dev);
4860 err_out_watch:
4861         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4862         if (tmp)
4863                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4864 out_header_name:
4865         kfree(rbd_dev->header_name);
4866         rbd_dev->header_name = NULL;
4867 err_out_format:
4868         rbd_dev->image_format = 0;
4869         kfree(rbd_dev->spec->image_id);
4870         rbd_dev->spec->image_id = NULL;
4871
4872         dout("probe failed, returning %d\n", ret);
4873
4874         return ret;
4875 }
4876
4877 static ssize_t rbd_add(struct bus_type *bus,
4878                        const char *buf,
4879                        size_t count)
4880 {
4881         struct rbd_device *rbd_dev = NULL;
4882         struct ceph_options *ceph_opts = NULL;
4883         struct rbd_options *rbd_opts = NULL;
4884         struct rbd_spec *spec = NULL;
4885         struct rbd_client *rbdc;
4886         struct ceph_osd_client *osdc;
4887         int rc = -ENOMEM;
4888
4889         if (!try_module_get(THIS_MODULE))
4890                 return -ENODEV;
4891
4892         /* parse add command */
4893         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4894         if (rc < 0)
4895                 goto err_out_module;
4896
4897         rbdc = rbd_get_client(ceph_opts);
4898         if (IS_ERR(rbdc)) {
4899                 rc = PTR_ERR(rbdc);
4900                 goto err_out_args;
4901         }
4902         ceph_opts = NULL;       /* rbd_dev client now owns this */
4903
4904         /* pick the pool */
4905         osdc = &rbdc->client->osdc;
4906         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4907         if (rc < 0)
4908                 goto err_out_client;
4909         spec->pool_id = (u64)rc;
4910
4911         /* The ceph file layout needs to fit pool id in 32 bits */
4912
4913         if (spec->pool_id > (u64)U32_MAX) {
4914                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4915                                 (unsigned long long)spec->pool_id, U32_MAX);
4916                 rc = -EIO;
4917                 goto err_out_client;
4918         }
4919
4920         rbd_dev = rbd_dev_create(rbdc, spec);
4921         if (!rbd_dev)
4922                 goto err_out_client;
4923         rbdc = NULL;            /* rbd_dev now owns this */
4924         spec = NULL;            /* rbd_dev now owns this */
4925
4926         rbd_dev->mapping.read_only = rbd_opts->read_only;
4927         kfree(rbd_opts);
4928         rbd_opts = NULL;        /* done with this */
4929
4930         rc = rbd_dev_image_probe(rbd_dev);
4931         if (rc < 0)
4932                 goto err_out_rbd_dev;
4933
4934         rc = rbd_dev_device_setup(rbd_dev);
4935         if (!rc)
4936                 return count;
4937
4938         rbd_dev_image_release(rbd_dev);
4939 err_out_rbd_dev:
4940         rbd_dev_destroy(rbd_dev);
4941 err_out_client:
4942         rbd_put_client(rbdc);
4943 err_out_args:
4944         if (ceph_opts)
4945                 ceph_destroy_options(ceph_opts);
4946         kfree(rbd_opts);
4947         rbd_spec_put(spec);
4948 err_out_module:
4949         module_put(THIS_MODULE);
4950
4951         dout("Error adding device %s\n", buf);
4952
4953         return (ssize_t)rc;
4954 }
4955
4956 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4957 {
4958         struct list_head *tmp;
4959         struct rbd_device *rbd_dev;
4960
4961         spin_lock(&rbd_dev_list_lock);
4962         list_for_each(tmp, &rbd_dev_list) {
4963                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4964                 if (rbd_dev->dev_id == dev_id) {
4965                         spin_unlock(&rbd_dev_list_lock);
4966                         return rbd_dev;
4967                 }
4968         }
4969         spin_unlock(&rbd_dev_list_lock);
4970         return NULL;
4971 }
4972
4973 static void rbd_dev_device_release(struct device *dev)
4974 {
4975         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4976
4977         rbd_free_disk(rbd_dev);
4978         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4979         rbd_dev_clear_mapping(rbd_dev);
4980         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4981         rbd_dev->major = 0;
4982         rbd_dev_id_put(rbd_dev);
4983         rbd_dev_mapping_clear(rbd_dev);
4984 }
4985
4986 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4987 {
4988         while (rbd_dev->parent) {
4989                 struct rbd_device *first = rbd_dev;
4990                 struct rbd_device *second = first->parent;
4991                 struct rbd_device *third;
4992
4993                 /*
4994                  * Follow to the parent with no grandparent and
4995                  * remove it.
4996                  */
4997                 while (second && (third = second->parent)) {
4998                         first = second;
4999                         second = third;
5000                 }
5001                 rbd_assert(second);
5002                 rbd_dev_image_release(second);
5003                 first->parent = NULL;
5004                 first->parent_overlap = 0;
5005
5006                 rbd_assert(first->parent_spec);
5007                 rbd_spec_put(first->parent_spec);
5008                 first->parent_spec = NULL;
5009         }
5010 }
5011
5012 static ssize_t rbd_remove(struct bus_type *bus,
5013                           const char *buf,
5014                           size_t count)
5015 {
5016         struct rbd_device *rbd_dev = NULL;
5017         int target_id;
5018         unsigned long ul;
5019         int ret;
5020
5021         ret = strict_strtoul(buf, 10, &ul);
5022         if (ret)
5023                 return ret;
5024
5025         /* convert to int; abort if we lost anything in the conversion */
5026         target_id = (int) ul;
5027         if (target_id != ul)
5028                 return -EINVAL;
5029
5030         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5031
5032         rbd_dev = __rbd_get_dev(target_id);
5033         if (!rbd_dev) {
5034                 ret = -ENOENT;
5035                 goto done;
5036         }
5037
5038         spin_lock_irq(&rbd_dev->lock);
5039         if (rbd_dev->open_count)
5040                 ret = -EBUSY;
5041         else
5042                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5043         spin_unlock_irq(&rbd_dev->lock);
5044         if (ret < 0)
5045                 goto done;
5046         ret = count;
5047         rbd_bus_del_dev(rbd_dev);
5048         rbd_dev_image_release(rbd_dev);
5049         module_put(THIS_MODULE);
5050 done:
5051         mutex_unlock(&ctl_mutex);
5052
5053         return ret;
5054 }
5055
5056 /*
5057  * create control files in sysfs
5058  * /sys/bus/rbd/...
5059  */
5060 static int rbd_sysfs_init(void)
5061 {
5062         int ret;
5063
5064         ret = device_register(&rbd_root_dev);
5065         if (ret < 0)
5066                 return ret;
5067
5068         ret = bus_register(&rbd_bus_type);
5069         if (ret < 0)
5070                 device_unregister(&rbd_root_dev);
5071
5072         return ret;
5073 }
5074
5075 static void rbd_sysfs_cleanup(void)
5076 {
5077         bus_unregister(&rbd_bus_type);
5078         device_unregister(&rbd_root_dev);
5079 }
5080
5081 static int __init rbd_init(void)
5082 {
5083         int rc;
5084
5085         if (!libceph_compatible(NULL)) {
5086                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5087
5088                 return -EINVAL;
5089         }
5090         rc = rbd_sysfs_init();
5091         if (rc)
5092                 return rc;
5093         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5094         return 0;
5095 }
5096
5097 static void __exit rbd_exit(void)
5098 {
5099         rbd_sysfs_cleanup();
5100 }
5101
5102 module_init(rbd_init);
5103 module_exit(rbd_exit);
5104
5105 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5106 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5107 MODULE_DESCRIPTION("rados block device");
5108
5109 /* following authorship retained from original osdblk.c */
5110 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5111
5112 MODULE_LICENSE("GPL");