]> git.karo-electronics.de Git - linux-beck.git/blob - drivers/block/rbd.c
rbd: extend the operation type
[linux-beck.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44 #include <linux/idr.h>
45 #include <linux/workqueue.h>
46
47 #include "rbd_types.h"
48
49 #define RBD_DEBUG       /* Activate rbd_assert() calls */
50
51 /*
52  * The basic unit of block I/O is a sector.  It is interpreted in a
53  * number of contexts in Linux (blk, bio, genhd), but the default is
54  * universally 512 bytes.  These symbols are just slightly more
55  * meaningful than the bare numbers they represent.
56  */
57 #define SECTOR_SHIFT    9
58 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
59
60 /*
61  * Increment the given counter and return its updated value.
62  * If the counter is already 0 it will not be incremented.
63  * If the counter is already at its maximum value returns
64  * -EINVAL without updating it.
65  */
66 static int atomic_inc_return_safe(atomic_t *v)
67 {
68         unsigned int counter;
69
70         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
71         if (counter <= (unsigned int)INT_MAX)
72                 return (int)counter;
73
74         atomic_dec(v);
75
76         return -EINVAL;
77 }
78
79 /* Decrement the counter.  Return the resulting value, or -EINVAL */
80 static int atomic_dec_return_safe(atomic_t *v)
81 {
82         int counter;
83
84         counter = atomic_dec_return(v);
85         if (counter >= 0)
86                 return counter;
87
88         atomic_inc(v);
89
90         return -EINVAL;
91 }
92
93 #define RBD_DRV_NAME "rbd"
94
95 #define RBD_MINORS_PER_MAJOR            256
96 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
97
98 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
99 #define RBD_MAX_SNAP_NAME_LEN   \
100                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
101
102 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
103
104 #define RBD_SNAP_HEAD_NAME      "-"
105
106 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
107
108 /* This allows a single page to hold an image name sent by OSD */
109 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
110 #define RBD_IMAGE_ID_LEN_MAX    64
111
112 #define RBD_OBJ_PREFIX_LEN_MAX  64
113
114 /* Feature bits */
115
116 #define RBD_FEATURE_LAYERING    (1<<0)
117 #define RBD_FEATURE_STRIPINGV2  (1<<1)
118 #define RBD_FEATURES_ALL \
119             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
120
121 /* Features supported by this (client software) implementation. */
122
123 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
124
125 /*
126  * An RBD device name will be "rbd#", where the "rbd" comes from
127  * RBD_DRV_NAME above, and # is a unique integer identifier.
128  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
129  * enough to hold all possible device names.
130  */
131 #define DEV_NAME_LEN            32
132 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
133
134 /*
135  * block device image metadata (in-memory version)
136  */
137 struct rbd_image_header {
138         /* These six fields never change for a given rbd image */
139         char *object_prefix;
140         __u8 obj_order;
141         __u8 crypt_type;
142         __u8 comp_type;
143         u64 stripe_unit;
144         u64 stripe_count;
145         u64 features;           /* Might be changeable someday? */
146
147         /* The remaining fields need to be updated occasionally */
148         u64 image_size;
149         struct ceph_snap_context *snapc;
150         char *snap_names;       /* format 1 only */
151         u64 *snap_sizes;        /* format 1 only */
152 };
153
154 /*
155  * An rbd image specification.
156  *
157  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
158  * identify an image.  Each rbd_dev structure includes a pointer to
159  * an rbd_spec structure that encapsulates this identity.
160  *
161  * Each of the id's in an rbd_spec has an associated name.  For a
162  * user-mapped image, the names are supplied and the id's associated
163  * with them are looked up.  For a layered image, a parent image is
164  * defined by the tuple, and the names are looked up.
165  *
166  * An rbd_dev structure contains a parent_spec pointer which is
167  * non-null if the image it represents is a child in a layered
168  * image.  This pointer will refer to the rbd_spec structure used
169  * by the parent rbd_dev for its own identity (i.e., the structure
170  * is shared between the parent and child).
171  *
172  * Since these structures are populated once, during the discovery
173  * phase of image construction, they are effectively immutable so
174  * we make no effort to synchronize access to them.
175  *
176  * Note that code herein does not assume the image name is known (it
177  * could be a null pointer).
178  */
179 struct rbd_spec {
180         u64             pool_id;
181         const char      *pool_name;
182
183         const char      *image_id;
184         const char      *image_name;
185
186         u64             snap_id;
187         const char      *snap_name;
188
189         struct kref     kref;
190 };
191
192 /*
193  * an instance of the client.  multiple devices may share an rbd client.
194  */
195 struct rbd_client {
196         struct ceph_client      *client;
197         struct kref             kref;
198         struct list_head        node;
199 };
200
201 struct rbd_img_request;
202 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
203
204 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
205
206 struct rbd_obj_request;
207 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
208
209 enum obj_request_type {
210         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
211 };
212
213 enum obj_operation_type {
214         OBJ_OP_WRITE,
215         OBJ_OP_READ,
216 };
217
218 enum obj_req_flags {
219         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
220         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
221         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
222         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
223 };
224
225 struct rbd_obj_request {
226         const char              *object_name;
227         u64                     offset;         /* object start byte */
228         u64                     length;         /* bytes from offset */
229         unsigned long           flags;
230
231         /*
232          * An object request associated with an image will have its
233          * img_data flag set; a standalone object request will not.
234          *
235          * A standalone object request will have which == BAD_WHICH
236          * and a null obj_request pointer.
237          *
238          * An object request initiated in support of a layered image
239          * object (to check for its existence before a write) will
240          * have which == BAD_WHICH and a non-null obj_request pointer.
241          *
242          * Finally, an object request for rbd image data will have
243          * which != BAD_WHICH, and will have a non-null img_request
244          * pointer.  The value of which will be in the range
245          * 0..(img_request->obj_request_count-1).
246          */
247         union {
248                 struct rbd_obj_request  *obj_request;   /* STAT op */
249                 struct {
250                         struct rbd_img_request  *img_request;
251                         u64                     img_offset;
252                         /* links for img_request->obj_requests list */
253                         struct list_head        links;
254                 };
255         };
256         u32                     which;          /* posn image request list */
257
258         enum obj_request_type   type;
259         union {
260                 struct bio      *bio_list;
261                 struct {
262                         struct page     **pages;
263                         u32             page_count;
264                 };
265         };
266         struct page             **copyup_pages;
267         u32                     copyup_page_count;
268
269         struct ceph_osd_request *osd_req;
270
271         u64                     xferred;        /* bytes transferred */
272         int                     result;
273
274         rbd_obj_callback_t      callback;
275         struct completion       completion;
276
277         struct kref             kref;
278 };
279
280 enum img_req_flags {
281         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
282         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
283         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
284 };
285
286 struct rbd_img_request {
287         struct rbd_device       *rbd_dev;
288         u64                     offset; /* starting image byte offset */
289         u64                     length; /* byte count from offset */
290         unsigned long           flags;
291         union {
292                 u64                     snap_id;        /* for reads */
293                 struct ceph_snap_context *snapc;        /* for writes */
294         };
295         union {
296                 struct request          *rq;            /* block request */
297                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
298         };
299         struct page             **copyup_pages;
300         u32                     copyup_page_count;
301         spinlock_t              completion_lock;/* protects next_completion */
302         u32                     next_completion;
303         rbd_img_callback_t      callback;
304         u64                     xferred;/* aggregate bytes transferred */
305         int                     result; /* first nonzero obj_request result */
306
307         u32                     obj_request_count;
308         struct list_head        obj_requests;   /* rbd_obj_request structs */
309
310         struct kref             kref;
311 };
312
313 #define for_each_obj_request(ireq, oreq) \
314         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
315 #define for_each_obj_request_from(ireq, oreq) \
316         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
317 #define for_each_obj_request_safe(ireq, oreq, n) \
318         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
319
320 struct rbd_mapping {
321         u64                     size;
322         u64                     features;
323         bool                    read_only;
324 };
325
326 /*
327  * a single device
328  */
329 struct rbd_device {
330         int                     dev_id;         /* blkdev unique id */
331
332         int                     major;          /* blkdev assigned major */
333         int                     minor;
334         struct gendisk          *disk;          /* blkdev's gendisk and rq */
335
336         u32                     image_format;   /* Either 1 or 2 */
337         struct rbd_client       *rbd_client;
338
339         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
340
341         struct list_head        rq_queue;       /* incoming rq queue */
342         spinlock_t              lock;           /* queue, flags, open_count */
343         struct workqueue_struct *rq_wq;
344         struct work_struct      rq_work;
345
346         struct rbd_image_header header;
347         unsigned long           flags;          /* possibly lock protected */
348         struct rbd_spec         *spec;
349
350         char                    *header_name;
351
352         struct ceph_file_layout layout;
353
354         struct ceph_osd_event   *watch_event;
355         struct rbd_obj_request  *watch_request;
356
357         struct rbd_spec         *parent_spec;
358         u64                     parent_overlap;
359         atomic_t                parent_ref;
360         struct rbd_device       *parent;
361
362         /* protects updating the header */
363         struct rw_semaphore     header_rwsem;
364
365         struct rbd_mapping      mapping;
366
367         struct list_head        node;
368
369         /* sysfs related */
370         struct device           dev;
371         unsigned long           open_count;     /* protected by lock */
372 };
373
374 /*
375  * Flag bits for rbd_dev->flags.  If atomicity is required,
376  * rbd_dev->lock is used to protect access.
377  *
378  * Currently, only the "removing" flag (which is coupled with the
379  * "open_count" field) requires atomic access.
380  */
381 enum rbd_dev_flags {
382         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
383         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
384 };
385
386 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
387
388 static LIST_HEAD(rbd_dev_list);    /* devices */
389 static DEFINE_SPINLOCK(rbd_dev_list_lock);
390
391 static LIST_HEAD(rbd_client_list);              /* clients */
392 static DEFINE_SPINLOCK(rbd_client_list_lock);
393
394 /* Slab caches for frequently-allocated structures */
395
396 static struct kmem_cache        *rbd_img_request_cache;
397 static struct kmem_cache        *rbd_obj_request_cache;
398 static struct kmem_cache        *rbd_segment_name_cache;
399
400 static int rbd_major;
401 static DEFINE_IDA(rbd_dev_id_ida);
402
403 /*
404  * Default to false for now, as single-major requires >= 0.75 version of
405  * userspace rbd utility.
406  */
407 static bool single_major = false;
408 module_param(single_major, bool, S_IRUGO);
409 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
410
411 static int rbd_img_request_submit(struct rbd_img_request *img_request);
412
413 static void rbd_dev_device_release(struct device *dev);
414
415 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
416                        size_t count);
417 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
418                           size_t count);
419 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
420                                     size_t count);
421 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
422                                        size_t count);
423 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
424 static void rbd_spec_put(struct rbd_spec *spec);
425
426 static int rbd_dev_id_to_minor(int dev_id)
427 {
428         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
429 }
430
431 static int minor_to_rbd_dev_id(int minor)
432 {
433         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
434 }
435
436 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
437 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
438 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
439 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
440
441 static struct attribute *rbd_bus_attrs[] = {
442         &bus_attr_add.attr,
443         &bus_attr_remove.attr,
444         &bus_attr_add_single_major.attr,
445         &bus_attr_remove_single_major.attr,
446         NULL,
447 };
448
449 static umode_t rbd_bus_is_visible(struct kobject *kobj,
450                                   struct attribute *attr, int index)
451 {
452         if (!single_major &&
453             (attr == &bus_attr_add_single_major.attr ||
454              attr == &bus_attr_remove_single_major.attr))
455                 return 0;
456
457         return attr->mode;
458 }
459
460 static const struct attribute_group rbd_bus_group = {
461         .attrs = rbd_bus_attrs,
462         .is_visible = rbd_bus_is_visible,
463 };
464 __ATTRIBUTE_GROUPS(rbd_bus);
465
466 static struct bus_type rbd_bus_type = {
467         .name           = "rbd",
468         .bus_groups     = rbd_bus_groups,
469 };
470
471 static void rbd_root_dev_release(struct device *dev)
472 {
473 }
474
475 static struct device rbd_root_dev = {
476         .init_name =    "rbd",
477         .release =      rbd_root_dev_release,
478 };
479
480 static __printf(2, 3)
481 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
482 {
483         struct va_format vaf;
484         va_list args;
485
486         va_start(args, fmt);
487         vaf.fmt = fmt;
488         vaf.va = &args;
489
490         if (!rbd_dev)
491                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
492         else if (rbd_dev->disk)
493                 printk(KERN_WARNING "%s: %s: %pV\n",
494                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
495         else if (rbd_dev->spec && rbd_dev->spec->image_name)
496                 printk(KERN_WARNING "%s: image %s: %pV\n",
497                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
498         else if (rbd_dev->spec && rbd_dev->spec->image_id)
499                 printk(KERN_WARNING "%s: id %s: %pV\n",
500                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
501         else    /* punt */
502                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
503                         RBD_DRV_NAME, rbd_dev, &vaf);
504         va_end(args);
505 }
506
507 #ifdef RBD_DEBUG
508 #define rbd_assert(expr)                                                \
509                 if (unlikely(!(expr))) {                                \
510                         printk(KERN_ERR "\nAssertion failure in %s() "  \
511                                                 "at line %d:\n\n"       \
512                                         "\trbd_assert(%s);\n\n",        \
513                                         __func__, __LINE__, #expr);     \
514                         BUG();                                          \
515                 }
516 #else /* !RBD_DEBUG */
517 #  define rbd_assert(expr)      ((void) 0)
518 #endif /* !RBD_DEBUG */
519
520 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
521 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
522 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
523
524 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
525 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
526 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
527 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
528 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
529                                         u64 snap_id);
530 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
531                                 u8 *order, u64 *snap_size);
532 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
533                 u64 *snap_features);
534 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
535
536 static int rbd_open(struct block_device *bdev, fmode_t mode)
537 {
538         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
539         bool removing = false;
540
541         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
542                 return -EROFS;
543
544         spin_lock_irq(&rbd_dev->lock);
545         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
546                 removing = true;
547         else
548                 rbd_dev->open_count++;
549         spin_unlock_irq(&rbd_dev->lock);
550         if (removing)
551                 return -ENOENT;
552
553         (void) get_device(&rbd_dev->dev);
554
555         return 0;
556 }
557
558 static void rbd_release(struct gendisk *disk, fmode_t mode)
559 {
560         struct rbd_device *rbd_dev = disk->private_data;
561         unsigned long open_count_before;
562
563         spin_lock_irq(&rbd_dev->lock);
564         open_count_before = rbd_dev->open_count--;
565         spin_unlock_irq(&rbd_dev->lock);
566         rbd_assert(open_count_before > 0);
567
568         put_device(&rbd_dev->dev);
569 }
570
571 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
572 {
573         int ret = 0;
574         int val;
575         bool ro;
576         bool ro_changed = false;
577
578         /* get_user() may sleep, so call it before taking rbd_dev->lock */
579         if (get_user(val, (int __user *)(arg)))
580                 return -EFAULT;
581
582         ro = val ? true : false;
583         /* Snapshot doesn't allow to write*/
584         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
585                 return -EROFS;
586
587         spin_lock_irq(&rbd_dev->lock);
588         /* prevent others open this device */
589         if (rbd_dev->open_count > 1) {
590                 ret = -EBUSY;
591                 goto out;
592         }
593
594         if (rbd_dev->mapping.read_only != ro) {
595                 rbd_dev->mapping.read_only = ro;
596                 ro_changed = true;
597         }
598
599 out:
600         spin_unlock_irq(&rbd_dev->lock);
601         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
602         if (ret == 0 && ro_changed)
603                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
604
605         return ret;
606 }
607
608 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
609                         unsigned int cmd, unsigned long arg)
610 {
611         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
612         int ret = 0;
613
614         switch (cmd) {
615         case BLKROSET:
616                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
617                 break;
618         default:
619                 ret = -ENOTTY;
620         }
621
622         return ret;
623 }
624
625 #ifdef CONFIG_COMPAT
626 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
627                                 unsigned int cmd, unsigned long arg)
628 {
629         return rbd_ioctl(bdev, mode, cmd, arg);
630 }
631 #endif /* CONFIG_COMPAT */
632
633 static const struct block_device_operations rbd_bd_ops = {
634         .owner                  = THIS_MODULE,
635         .open                   = rbd_open,
636         .release                = rbd_release,
637         .ioctl                  = rbd_ioctl,
638 #ifdef CONFIG_COMPAT
639         .compat_ioctl           = rbd_compat_ioctl,
640 #endif
641 };
642
643 /*
644  * Initialize an rbd client instance.  Success or not, this function
645  * consumes ceph_opts.  Caller holds client_mutex.
646  */
647 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
648 {
649         struct rbd_client *rbdc;
650         int ret = -ENOMEM;
651
652         dout("%s:\n", __func__);
653         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
654         if (!rbdc)
655                 goto out_opt;
656
657         kref_init(&rbdc->kref);
658         INIT_LIST_HEAD(&rbdc->node);
659
660         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
661         if (IS_ERR(rbdc->client))
662                 goto out_rbdc;
663         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
664
665         ret = ceph_open_session(rbdc->client);
666         if (ret < 0)
667                 goto out_client;
668
669         spin_lock(&rbd_client_list_lock);
670         list_add_tail(&rbdc->node, &rbd_client_list);
671         spin_unlock(&rbd_client_list_lock);
672
673         dout("%s: rbdc %p\n", __func__, rbdc);
674
675         return rbdc;
676 out_client:
677         ceph_destroy_client(rbdc->client);
678 out_rbdc:
679         kfree(rbdc);
680 out_opt:
681         if (ceph_opts)
682                 ceph_destroy_options(ceph_opts);
683         dout("%s: error %d\n", __func__, ret);
684
685         return ERR_PTR(ret);
686 }
687
688 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
689 {
690         kref_get(&rbdc->kref);
691
692         return rbdc;
693 }
694
695 /*
696  * Find a ceph client with specific addr and configuration.  If
697  * found, bump its reference count.
698  */
699 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
700 {
701         struct rbd_client *client_node;
702         bool found = false;
703
704         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
705                 return NULL;
706
707         spin_lock(&rbd_client_list_lock);
708         list_for_each_entry(client_node, &rbd_client_list, node) {
709                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
710                         __rbd_get_client(client_node);
711
712                         found = true;
713                         break;
714                 }
715         }
716         spin_unlock(&rbd_client_list_lock);
717
718         return found ? client_node : NULL;
719 }
720
721 /*
722  * mount options
723  */
724 enum {
725         Opt_last_int,
726         /* int args above */
727         Opt_last_string,
728         /* string args above */
729         Opt_read_only,
730         Opt_read_write,
731         /* Boolean args above */
732         Opt_last_bool,
733 };
734
735 static match_table_t rbd_opts_tokens = {
736         /* int args above */
737         /* string args above */
738         {Opt_read_only, "read_only"},
739         {Opt_read_only, "ro"},          /* Alternate spelling */
740         {Opt_read_write, "read_write"},
741         {Opt_read_write, "rw"},         /* Alternate spelling */
742         /* Boolean args above */
743         {-1, NULL}
744 };
745
746 struct rbd_options {
747         bool    read_only;
748 };
749
750 #define RBD_READ_ONLY_DEFAULT   false
751
752 static int parse_rbd_opts_token(char *c, void *private)
753 {
754         struct rbd_options *rbd_opts = private;
755         substring_t argstr[MAX_OPT_ARGS];
756         int token, intval, ret;
757
758         token = match_token(c, rbd_opts_tokens, argstr);
759         if (token < 0)
760                 return -EINVAL;
761
762         if (token < Opt_last_int) {
763                 ret = match_int(&argstr[0], &intval);
764                 if (ret < 0) {
765                         pr_err("bad mount option arg (not int) "
766                                "at '%s'\n", c);
767                         return ret;
768                 }
769                 dout("got int token %d val %d\n", token, intval);
770         } else if (token > Opt_last_int && token < Opt_last_string) {
771                 dout("got string token %d val %s\n", token,
772                      argstr[0].from);
773         } else if (token > Opt_last_string && token < Opt_last_bool) {
774                 dout("got Boolean token %d\n", token);
775         } else {
776                 dout("got token %d\n", token);
777         }
778
779         switch (token) {
780         case Opt_read_only:
781                 rbd_opts->read_only = true;
782                 break;
783         case Opt_read_write:
784                 rbd_opts->read_only = false;
785                 break;
786         default:
787                 rbd_assert(false);
788                 break;
789         }
790         return 0;
791 }
792
793 static char* obj_op_name(enum obj_operation_type op_type)
794 {
795         switch (op_type) {
796         case OBJ_OP_READ:
797                 return "read";
798         case OBJ_OP_WRITE:
799                 return "write";
800         default:
801                 return "???";
802         }
803 }
804
805 /*
806  * Get a ceph client with specific addr and configuration, if one does
807  * not exist create it.  Either way, ceph_opts is consumed by this
808  * function.
809  */
810 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
811 {
812         struct rbd_client *rbdc;
813
814         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
815         rbdc = rbd_client_find(ceph_opts);
816         if (rbdc)       /* using an existing client */
817                 ceph_destroy_options(ceph_opts);
818         else
819                 rbdc = rbd_client_create(ceph_opts);
820         mutex_unlock(&client_mutex);
821
822         return rbdc;
823 }
824
825 /*
826  * Destroy ceph client
827  *
828  * Caller must hold rbd_client_list_lock.
829  */
830 static void rbd_client_release(struct kref *kref)
831 {
832         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
833
834         dout("%s: rbdc %p\n", __func__, rbdc);
835         spin_lock(&rbd_client_list_lock);
836         list_del(&rbdc->node);
837         spin_unlock(&rbd_client_list_lock);
838
839         ceph_destroy_client(rbdc->client);
840         kfree(rbdc);
841 }
842
843 /*
844  * Drop reference to ceph client node. If it's not referenced anymore, release
845  * it.
846  */
847 static void rbd_put_client(struct rbd_client *rbdc)
848 {
849         if (rbdc)
850                 kref_put(&rbdc->kref, rbd_client_release);
851 }
852
853 static bool rbd_image_format_valid(u32 image_format)
854 {
855         return image_format == 1 || image_format == 2;
856 }
857
858 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
859 {
860         size_t size;
861         u32 snap_count;
862
863         /* The header has to start with the magic rbd header text */
864         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
865                 return false;
866
867         /* The bio layer requires at least sector-sized I/O */
868
869         if (ondisk->options.order < SECTOR_SHIFT)
870                 return false;
871
872         /* If we use u64 in a few spots we may be able to loosen this */
873
874         if (ondisk->options.order > 8 * sizeof (int) - 1)
875                 return false;
876
877         /*
878          * The size of a snapshot header has to fit in a size_t, and
879          * that limits the number of snapshots.
880          */
881         snap_count = le32_to_cpu(ondisk->snap_count);
882         size = SIZE_MAX - sizeof (struct ceph_snap_context);
883         if (snap_count > size / sizeof (__le64))
884                 return false;
885
886         /*
887          * Not only that, but the size of the entire the snapshot
888          * header must also be representable in a size_t.
889          */
890         size -= snap_count * sizeof (__le64);
891         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
892                 return false;
893
894         return true;
895 }
896
897 /*
898  * Fill an rbd image header with information from the given format 1
899  * on-disk header.
900  */
901 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
902                                  struct rbd_image_header_ondisk *ondisk)
903 {
904         struct rbd_image_header *header = &rbd_dev->header;
905         bool first_time = header->object_prefix == NULL;
906         struct ceph_snap_context *snapc;
907         char *object_prefix = NULL;
908         char *snap_names = NULL;
909         u64 *snap_sizes = NULL;
910         u32 snap_count;
911         size_t size;
912         int ret = -ENOMEM;
913         u32 i;
914
915         /* Allocate this now to avoid having to handle failure below */
916
917         if (first_time) {
918                 size_t len;
919
920                 len = strnlen(ondisk->object_prefix,
921                                 sizeof (ondisk->object_prefix));
922                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
923                 if (!object_prefix)
924                         return -ENOMEM;
925                 memcpy(object_prefix, ondisk->object_prefix, len);
926                 object_prefix[len] = '\0';
927         }
928
929         /* Allocate the snapshot context and fill it in */
930
931         snap_count = le32_to_cpu(ondisk->snap_count);
932         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
933         if (!snapc)
934                 goto out_err;
935         snapc->seq = le64_to_cpu(ondisk->snap_seq);
936         if (snap_count) {
937                 struct rbd_image_snap_ondisk *snaps;
938                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
939
940                 /* We'll keep a copy of the snapshot names... */
941
942                 if (snap_names_len > (u64)SIZE_MAX)
943                         goto out_2big;
944                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
945                 if (!snap_names)
946                         goto out_err;
947
948                 /* ...as well as the array of their sizes. */
949
950                 size = snap_count * sizeof (*header->snap_sizes);
951                 snap_sizes = kmalloc(size, GFP_KERNEL);
952                 if (!snap_sizes)
953                         goto out_err;
954
955                 /*
956                  * Copy the names, and fill in each snapshot's id
957                  * and size.
958                  *
959                  * Note that rbd_dev_v1_header_info() guarantees the
960                  * ondisk buffer we're working with has
961                  * snap_names_len bytes beyond the end of the
962                  * snapshot id array, this memcpy() is safe.
963                  */
964                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
965                 snaps = ondisk->snaps;
966                 for (i = 0; i < snap_count; i++) {
967                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
968                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
969                 }
970         }
971
972         /* We won't fail any more, fill in the header */
973
974         if (first_time) {
975                 header->object_prefix = object_prefix;
976                 header->obj_order = ondisk->options.order;
977                 header->crypt_type = ondisk->options.crypt_type;
978                 header->comp_type = ondisk->options.comp_type;
979                 /* The rest aren't used for format 1 images */
980                 header->stripe_unit = 0;
981                 header->stripe_count = 0;
982                 header->features = 0;
983         } else {
984                 ceph_put_snap_context(header->snapc);
985                 kfree(header->snap_names);
986                 kfree(header->snap_sizes);
987         }
988
989         /* The remaining fields always get updated (when we refresh) */
990
991         header->image_size = le64_to_cpu(ondisk->image_size);
992         header->snapc = snapc;
993         header->snap_names = snap_names;
994         header->snap_sizes = snap_sizes;
995
996         return 0;
997 out_2big:
998         ret = -EIO;
999 out_err:
1000         kfree(snap_sizes);
1001         kfree(snap_names);
1002         ceph_put_snap_context(snapc);
1003         kfree(object_prefix);
1004
1005         return ret;
1006 }
1007
1008 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1009 {
1010         const char *snap_name;
1011
1012         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1013
1014         /* Skip over names until we find the one we are looking for */
1015
1016         snap_name = rbd_dev->header.snap_names;
1017         while (which--)
1018                 snap_name += strlen(snap_name) + 1;
1019
1020         return kstrdup(snap_name, GFP_KERNEL);
1021 }
1022
1023 /*
1024  * Snapshot id comparison function for use with qsort()/bsearch().
1025  * Note that result is for snapshots in *descending* order.
1026  */
1027 static int snapid_compare_reverse(const void *s1, const void *s2)
1028 {
1029         u64 snap_id1 = *(u64 *)s1;
1030         u64 snap_id2 = *(u64 *)s2;
1031
1032         if (snap_id1 < snap_id2)
1033                 return 1;
1034         return snap_id1 == snap_id2 ? 0 : -1;
1035 }
1036
1037 /*
1038  * Search a snapshot context to see if the given snapshot id is
1039  * present.
1040  *
1041  * Returns the position of the snapshot id in the array if it's found,
1042  * or BAD_SNAP_INDEX otherwise.
1043  *
1044  * Note: The snapshot array is in kept sorted (by the osd) in
1045  * reverse order, highest snapshot id first.
1046  */
1047 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1048 {
1049         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1050         u64 *found;
1051
1052         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1053                                 sizeof (snap_id), snapid_compare_reverse);
1054
1055         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1056 }
1057
1058 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1059                                         u64 snap_id)
1060 {
1061         u32 which;
1062         const char *snap_name;
1063
1064         which = rbd_dev_snap_index(rbd_dev, snap_id);
1065         if (which == BAD_SNAP_INDEX)
1066                 return ERR_PTR(-ENOENT);
1067
1068         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1069         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1070 }
1071
1072 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1073 {
1074         if (snap_id == CEPH_NOSNAP)
1075                 return RBD_SNAP_HEAD_NAME;
1076
1077         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1078         if (rbd_dev->image_format == 1)
1079                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1080
1081         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1082 }
1083
1084 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1085                                 u64 *snap_size)
1086 {
1087         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1088         if (snap_id == CEPH_NOSNAP) {
1089                 *snap_size = rbd_dev->header.image_size;
1090         } else if (rbd_dev->image_format == 1) {
1091                 u32 which;
1092
1093                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1094                 if (which == BAD_SNAP_INDEX)
1095                         return -ENOENT;
1096
1097                 *snap_size = rbd_dev->header.snap_sizes[which];
1098         } else {
1099                 u64 size = 0;
1100                 int ret;
1101
1102                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1103                 if (ret)
1104                         return ret;
1105
1106                 *snap_size = size;
1107         }
1108         return 0;
1109 }
1110
1111 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1112                         u64 *snap_features)
1113 {
1114         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1115         if (snap_id == CEPH_NOSNAP) {
1116                 *snap_features = rbd_dev->header.features;
1117         } else if (rbd_dev->image_format == 1) {
1118                 *snap_features = 0;     /* No features for format 1 */
1119         } else {
1120                 u64 features = 0;
1121                 int ret;
1122
1123                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1124                 if (ret)
1125                         return ret;
1126
1127                 *snap_features = features;
1128         }
1129         return 0;
1130 }
1131
1132 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1133 {
1134         u64 snap_id = rbd_dev->spec->snap_id;
1135         u64 size = 0;
1136         u64 features = 0;
1137         int ret;
1138
1139         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1140         if (ret)
1141                 return ret;
1142         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1143         if (ret)
1144                 return ret;
1145
1146         rbd_dev->mapping.size = size;
1147         rbd_dev->mapping.features = features;
1148
1149         return 0;
1150 }
1151
1152 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1153 {
1154         rbd_dev->mapping.size = 0;
1155         rbd_dev->mapping.features = 0;
1156 }
1157
1158 static void rbd_segment_name_free(const char *name)
1159 {
1160         /* The explicit cast here is needed to drop the const qualifier */
1161
1162         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1163 }
1164
1165 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1166 {
1167         char *name;
1168         u64 segment;
1169         int ret;
1170         char *name_format;
1171
1172         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1173         if (!name)
1174                 return NULL;
1175         segment = offset >> rbd_dev->header.obj_order;
1176         name_format = "%s.%012llx";
1177         if (rbd_dev->image_format == 2)
1178                 name_format = "%s.%016llx";
1179         ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1180                         rbd_dev->header.object_prefix, segment);
1181         if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1182                 pr_err("error formatting segment name for #%llu (%d)\n",
1183                         segment, ret);
1184                 rbd_segment_name_free(name);
1185                 name = NULL;
1186         }
1187
1188         return name;
1189 }
1190
1191 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1192 {
1193         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1194
1195         return offset & (segment_size - 1);
1196 }
1197
1198 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1199                                 u64 offset, u64 length)
1200 {
1201         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1202
1203         offset &= segment_size - 1;
1204
1205         rbd_assert(length <= U64_MAX - offset);
1206         if (offset + length > segment_size)
1207                 length = segment_size - offset;
1208
1209         return length;
1210 }
1211
1212 /*
1213  * returns the size of an object in the image
1214  */
1215 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1216 {
1217         return 1 << header->obj_order;
1218 }
1219
1220 /*
1221  * bio helpers
1222  */
1223
1224 static void bio_chain_put(struct bio *chain)
1225 {
1226         struct bio *tmp;
1227
1228         while (chain) {
1229                 tmp = chain;
1230                 chain = chain->bi_next;
1231                 bio_put(tmp);
1232         }
1233 }
1234
1235 /*
1236  * zeros a bio chain, starting at specific offset
1237  */
1238 static void zero_bio_chain(struct bio *chain, int start_ofs)
1239 {
1240         struct bio_vec bv;
1241         struct bvec_iter iter;
1242         unsigned long flags;
1243         void *buf;
1244         int pos = 0;
1245
1246         while (chain) {
1247                 bio_for_each_segment(bv, chain, iter) {
1248                         if (pos + bv.bv_len > start_ofs) {
1249                                 int remainder = max(start_ofs - pos, 0);
1250                                 buf = bvec_kmap_irq(&bv, &flags);
1251                                 memset(buf + remainder, 0,
1252                                        bv.bv_len - remainder);
1253                                 flush_dcache_page(bv.bv_page);
1254                                 bvec_kunmap_irq(buf, &flags);
1255                         }
1256                         pos += bv.bv_len;
1257                 }
1258
1259                 chain = chain->bi_next;
1260         }
1261 }
1262
1263 /*
1264  * similar to zero_bio_chain(), zeros data defined by a page array,
1265  * starting at the given byte offset from the start of the array and
1266  * continuing up to the given end offset.  The pages array is
1267  * assumed to be big enough to hold all bytes up to the end.
1268  */
1269 static void zero_pages(struct page **pages, u64 offset, u64 end)
1270 {
1271         struct page **page = &pages[offset >> PAGE_SHIFT];
1272
1273         rbd_assert(end > offset);
1274         rbd_assert(end - offset <= (u64)SIZE_MAX);
1275         while (offset < end) {
1276                 size_t page_offset;
1277                 size_t length;
1278                 unsigned long flags;
1279                 void *kaddr;
1280
1281                 page_offset = offset & ~PAGE_MASK;
1282                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1283                 local_irq_save(flags);
1284                 kaddr = kmap_atomic(*page);
1285                 memset(kaddr + page_offset, 0, length);
1286                 flush_dcache_page(*page);
1287                 kunmap_atomic(kaddr);
1288                 local_irq_restore(flags);
1289
1290                 offset += length;
1291                 page++;
1292         }
1293 }
1294
1295 /*
1296  * Clone a portion of a bio, starting at the given byte offset
1297  * and continuing for the number of bytes indicated.
1298  */
1299 static struct bio *bio_clone_range(struct bio *bio_src,
1300                                         unsigned int offset,
1301                                         unsigned int len,
1302                                         gfp_t gfpmask)
1303 {
1304         struct bio *bio;
1305
1306         bio = bio_clone(bio_src, gfpmask);
1307         if (!bio)
1308                 return NULL;    /* ENOMEM */
1309
1310         bio_advance(bio, offset);
1311         bio->bi_iter.bi_size = len;
1312
1313         return bio;
1314 }
1315
1316 /*
1317  * Clone a portion of a bio chain, starting at the given byte offset
1318  * into the first bio in the source chain and continuing for the
1319  * number of bytes indicated.  The result is another bio chain of
1320  * exactly the given length, or a null pointer on error.
1321  *
1322  * The bio_src and offset parameters are both in-out.  On entry they
1323  * refer to the first source bio and the offset into that bio where
1324  * the start of data to be cloned is located.
1325  *
1326  * On return, bio_src is updated to refer to the bio in the source
1327  * chain that contains first un-cloned byte, and *offset will
1328  * contain the offset of that byte within that bio.
1329  */
1330 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1331                                         unsigned int *offset,
1332                                         unsigned int len,
1333                                         gfp_t gfpmask)
1334 {
1335         struct bio *bi = *bio_src;
1336         unsigned int off = *offset;
1337         struct bio *chain = NULL;
1338         struct bio **end;
1339
1340         /* Build up a chain of clone bios up to the limit */
1341
1342         if (!bi || off >= bi->bi_iter.bi_size || !len)
1343                 return NULL;            /* Nothing to clone */
1344
1345         end = &chain;
1346         while (len) {
1347                 unsigned int bi_size;
1348                 struct bio *bio;
1349
1350                 if (!bi) {
1351                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1352                         goto out_err;   /* EINVAL; ran out of bio's */
1353                 }
1354                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1355                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1356                 if (!bio)
1357                         goto out_err;   /* ENOMEM */
1358
1359                 *end = bio;
1360                 end = &bio->bi_next;
1361
1362                 off += bi_size;
1363                 if (off == bi->bi_iter.bi_size) {
1364                         bi = bi->bi_next;
1365                         off = 0;
1366                 }
1367                 len -= bi_size;
1368         }
1369         *bio_src = bi;
1370         *offset = off;
1371
1372         return chain;
1373 out_err:
1374         bio_chain_put(chain);
1375
1376         return NULL;
1377 }
1378
1379 /*
1380  * The default/initial value for all object request flags is 0.  For
1381  * each flag, once its value is set to 1 it is never reset to 0
1382  * again.
1383  */
1384 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1385 {
1386         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1387                 struct rbd_device *rbd_dev;
1388
1389                 rbd_dev = obj_request->img_request->rbd_dev;
1390                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1391                         obj_request);
1392         }
1393 }
1394
1395 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1396 {
1397         smp_mb();
1398         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1399 }
1400
1401 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1402 {
1403         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1404                 struct rbd_device *rbd_dev = NULL;
1405
1406                 if (obj_request_img_data_test(obj_request))
1407                         rbd_dev = obj_request->img_request->rbd_dev;
1408                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1409                         obj_request);
1410         }
1411 }
1412
1413 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1414 {
1415         smp_mb();
1416         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1417 }
1418
1419 /*
1420  * This sets the KNOWN flag after (possibly) setting the EXISTS
1421  * flag.  The latter is set based on the "exists" value provided.
1422  *
1423  * Note that for our purposes once an object exists it never goes
1424  * away again.  It's possible that the response from two existence
1425  * checks are separated by the creation of the target object, and
1426  * the first ("doesn't exist") response arrives *after* the second
1427  * ("does exist").  In that case we ignore the second one.
1428  */
1429 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1430                                 bool exists)
1431 {
1432         if (exists)
1433                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1434         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1435         smp_mb();
1436 }
1437
1438 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1439 {
1440         smp_mb();
1441         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1442 }
1443
1444 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1445 {
1446         smp_mb();
1447         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1448 }
1449
1450 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1451 {
1452         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1453
1454         return obj_request->img_offset <
1455             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1456 }
1457
1458 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1459 {
1460         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1461                 atomic_read(&obj_request->kref.refcount));
1462         kref_get(&obj_request->kref);
1463 }
1464
1465 static void rbd_obj_request_destroy(struct kref *kref);
1466 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1467 {
1468         rbd_assert(obj_request != NULL);
1469         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1470                 atomic_read(&obj_request->kref.refcount));
1471         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1472 }
1473
1474 static void rbd_img_request_get(struct rbd_img_request *img_request)
1475 {
1476         dout("%s: img %p (was %d)\n", __func__, img_request,
1477              atomic_read(&img_request->kref.refcount));
1478         kref_get(&img_request->kref);
1479 }
1480
1481 static bool img_request_child_test(struct rbd_img_request *img_request);
1482 static void rbd_parent_request_destroy(struct kref *kref);
1483 static void rbd_img_request_destroy(struct kref *kref);
1484 static void rbd_img_request_put(struct rbd_img_request *img_request)
1485 {
1486         rbd_assert(img_request != NULL);
1487         dout("%s: img %p (was %d)\n", __func__, img_request,
1488                 atomic_read(&img_request->kref.refcount));
1489         if (img_request_child_test(img_request))
1490                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1491         else
1492                 kref_put(&img_request->kref, rbd_img_request_destroy);
1493 }
1494
1495 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1496                                         struct rbd_obj_request *obj_request)
1497 {
1498         rbd_assert(obj_request->img_request == NULL);
1499
1500         /* Image request now owns object's original reference */
1501         obj_request->img_request = img_request;
1502         obj_request->which = img_request->obj_request_count;
1503         rbd_assert(!obj_request_img_data_test(obj_request));
1504         obj_request_img_data_set(obj_request);
1505         rbd_assert(obj_request->which != BAD_WHICH);
1506         img_request->obj_request_count++;
1507         list_add_tail(&obj_request->links, &img_request->obj_requests);
1508         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1509                 obj_request->which);
1510 }
1511
1512 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1513                                         struct rbd_obj_request *obj_request)
1514 {
1515         rbd_assert(obj_request->which != BAD_WHICH);
1516
1517         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1518                 obj_request->which);
1519         list_del(&obj_request->links);
1520         rbd_assert(img_request->obj_request_count > 0);
1521         img_request->obj_request_count--;
1522         rbd_assert(obj_request->which == img_request->obj_request_count);
1523         obj_request->which = BAD_WHICH;
1524         rbd_assert(obj_request_img_data_test(obj_request));
1525         rbd_assert(obj_request->img_request == img_request);
1526         obj_request->img_request = NULL;
1527         obj_request->callback = NULL;
1528         rbd_obj_request_put(obj_request);
1529 }
1530
1531 static bool obj_request_type_valid(enum obj_request_type type)
1532 {
1533         switch (type) {
1534         case OBJ_REQUEST_NODATA:
1535         case OBJ_REQUEST_BIO:
1536         case OBJ_REQUEST_PAGES:
1537                 return true;
1538         default:
1539                 return false;
1540         }
1541 }
1542
1543 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1544                                 struct rbd_obj_request *obj_request)
1545 {
1546         dout("%s %p\n", __func__, obj_request);
1547         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1548 }
1549
1550 static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1551 {
1552         dout("%s %p\n", __func__, obj_request);
1553         ceph_osdc_cancel_request(obj_request->osd_req);
1554 }
1555
1556 /*
1557  * Wait for an object request to complete.  If interrupted, cancel the
1558  * underlying osd request.
1559  */
1560 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1561 {
1562         int ret;
1563
1564         dout("%s %p\n", __func__, obj_request);
1565
1566         ret = wait_for_completion_interruptible(&obj_request->completion);
1567         if (ret < 0) {
1568                 dout("%s %p interrupted\n", __func__, obj_request);
1569                 rbd_obj_request_end(obj_request);
1570                 return ret;
1571         }
1572
1573         dout("%s %p done\n", __func__, obj_request);
1574         return 0;
1575 }
1576
1577 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1578 {
1579
1580         dout("%s: img %p\n", __func__, img_request);
1581
1582         /*
1583          * If no error occurred, compute the aggregate transfer
1584          * count for the image request.  We could instead use
1585          * atomic64_cmpxchg() to update it as each object request
1586          * completes; not clear which way is better off hand.
1587          */
1588         if (!img_request->result) {
1589                 struct rbd_obj_request *obj_request;
1590                 u64 xferred = 0;
1591
1592                 for_each_obj_request(img_request, obj_request)
1593                         xferred += obj_request->xferred;
1594                 img_request->xferred = xferred;
1595         }
1596
1597         if (img_request->callback)
1598                 img_request->callback(img_request);
1599         else
1600                 rbd_img_request_put(img_request);
1601 }
1602
1603 /*
1604  * The default/initial value for all image request flags is 0.  Each
1605  * is conditionally set to 1 at image request initialization time
1606  * and currently never change thereafter.
1607  */
1608 static void img_request_write_set(struct rbd_img_request *img_request)
1609 {
1610         set_bit(IMG_REQ_WRITE, &img_request->flags);
1611         smp_mb();
1612 }
1613
1614 static bool img_request_write_test(struct rbd_img_request *img_request)
1615 {
1616         smp_mb();
1617         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1618 }
1619
1620 static void img_request_child_set(struct rbd_img_request *img_request)
1621 {
1622         set_bit(IMG_REQ_CHILD, &img_request->flags);
1623         smp_mb();
1624 }
1625
1626 static void img_request_child_clear(struct rbd_img_request *img_request)
1627 {
1628         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1629         smp_mb();
1630 }
1631
1632 static bool img_request_child_test(struct rbd_img_request *img_request)
1633 {
1634         smp_mb();
1635         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1636 }
1637
1638 static void img_request_layered_set(struct rbd_img_request *img_request)
1639 {
1640         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1641         smp_mb();
1642 }
1643
1644 static void img_request_layered_clear(struct rbd_img_request *img_request)
1645 {
1646         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1647         smp_mb();
1648 }
1649
1650 static bool img_request_layered_test(struct rbd_img_request *img_request)
1651 {
1652         smp_mb();
1653         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1654 }
1655
1656 static void
1657 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1658 {
1659         u64 xferred = obj_request->xferred;
1660         u64 length = obj_request->length;
1661
1662         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1663                 obj_request, obj_request->img_request, obj_request->result,
1664                 xferred, length);
1665         /*
1666          * ENOENT means a hole in the image.  We zero-fill the entire
1667          * length of the request.  A short read also implies zero-fill
1668          * to the end of the request.  An error requires the whole
1669          * length of the request to be reported finished with an error
1670          * to the block layer.  In each case we update the xferred
1671          * count to indicate the whole request was satisfied.
1672          */
1673         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1674         if (obj_request->result == -ENOENT) {
1675                 if (obj_request->type == OBJ_REQUEST_BIO)
1676                         zero_bio_chain(obj_request->bio_list, 0);
1677                 else
1678                         zero_pages(obj_request->pages, 0, length);
1679                 obj_request->result = 0;
1680         } else if (xferred < length && !obj_request->result) {
1681                 if (obj_request->type == OBJ_REQUEST_BIO)
1682                         zero_bio_chain(obj_request->bio_list, xferred);
1683                 else
1684                         zero_pages(obj_request->pages, xferred, length);
1685         }
1686         obj_request->xferred = length;
1687         obj_request_done_set(obj_request);
1688 }
1689
1690 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1691 {
1692         dout("%s: obj %p cb %p\n", __func__, obj_request,
1693                 obj_request->callback);
1694         if (obj_request->callback)
1695                 obj_request->callback(obj_request);
1696         else
1697                 complete_all(&obj_request->completion);
1698 }
1699
1700 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1701 {
1702         dout("%s: obj %p\n", __func__, obj_request);
1703         obj_request_done_set(obj_request);
1704 }
1705
1706 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1707 {
1708         struct rbd_img_request *img_request = NULL;
1709         struct rbd_device *rbd_dev = NULL;
1710         bool layered = false;
1711
1712         if (obj_request_img_data_test(obj_request)) {
1713                 img_request = obj_request->img_request;
1714                 layered = img_request && img_request_layered_test(img_request);
1715                 rbd_dev = img_request->rbd_dev;
1716         }
1717
1718         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1719                 obj_request, img_request, obj_request->result,
1720                 obj_request->xferred, obj_request->length);
1721         if (layered && obj_request->result == -ENOENT &&
1722                         obj_request->img_offset < rbd_dev->parent_overlap)
1723                 rbd_img_parent_read(obj_request);
1724         else if (img_request)
1725                 rbd_img_obj_request_read_callback(obj_request);
1726         else
1727                 obj_request_done_set(obj_request);
1728 }
1729
1730 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1731 {
1732         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1733                 obj_request->result, obj_request->length);
1734         /*
1735          * There is no such thing as a successful short write.  Set
1736          * it to our originally-requested length.
1737          */
1738         obj_request->xferred = obj_request->length;
1739         obj_request_done_set(obj_request);
1740 }
1741
1742 /*
1743  * For a simple stat call there's nothing to do.  We'll do more if
1744  * this is part of a write sequence for a layered image.
1745  */
1746 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1747 {
1748         dout("%s: obj %p\n", __func__, obj_request);
1749         obj_request_done_set(obj_request);
1750 }
1751
1752 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1753                                 struct ceph_msg *msg)
1754 {
1755         struct rbd_obj_request *obj_request = osd_req->r_priv;
1756         u16 opcode;
1757
1758         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1759         rbd_assert(osd_req == obj_request->osd_req);
1760         if (obj_request_img_data_test(obj_request)) {
1761                 rbd_assert(obj_request->img_request);
1762                 rbd_assert(obj_request->which != BAD_WHICH);
1763         } else {
1764                 rbd_assert(obj_request->which == BAD_WHICH);
1765         }
1766
1767         if (osd_req->r_result < 0)
1768                 obj_request->result = osd_req->r_result;
1769
1770         rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1771
1772         /*
1773          * We support a 64-bit length, but ultimately it has to be
1774          * passed to blk_end_request(), which takes an unsigned int.
1775          */
1776         obj_request->xferred = osd_req->r_reply_op_len[0];
1777         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1778
1779         opcode = osd_req->r_ops[0].op;
1780         switch (opcode) {
1781         case CEPH_OSD_OP_READ:
1782                 rbd_osd_read_callback(obj_request);
1783                 break;
1784         case CEPH_OSD_OP_SETALLOCHINT:
1785                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
1786                 /* fall through */
1787         case CEPH_OSD_OP_WRITE:
1788                 rbd_osd_write_callback(obj_request);
1789                 break;
1790         case CEPH_OSD_OP_STAT:
1791                 rbd_osd_stat_callback(obj_request);
1792                 break;
1793         case CEPH_OSD_OP_CALL:
1794         case CEPH_OSD_OP_NOTIFY_ACK:
1795         case CEPH_OSD_OP_WATCH:
1796                 rbd_osd_trivial_callback(obj_request);
1797                 break;
1798         default:
1799                 rbd_warn(NULL, "%s: unsupported op %hu",
1800                         obj_request->object_name, (unsigned short) opcode);
1801                 break;
1802         }
1803
1804         if (obj_request_done_test(obj_request))
1805                 rbd_obj_request_complete(obj_request);
1806 }
1807
1808 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1809 {
1810         struct rbd_img_request *img_request = obj_request->img_request;
1811         struct ceph_osd_request *osd_req = obj_request->osd_req;
1812         u64 snap_id;
1813
1814         rbd_assert(osd_req != NULL);
1815
1816         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1817         ceph_osdc_build_request(osd_req, obj_request->offset,
1818                         NULL, snap_id, NULL);
1819 }
1820
1821 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1822 {
1823         struct rbd_img_request *img_request = obj_request->img_request;
1824         struct ceph_osd_request *osd_req = obj_request->osd_req;
1825         struct ceph_snap_context *snapc;
1826         struct timespec mtime = CURRENT_TIME;
1827
1828         rbd_assert(osd_req != NULL);
1829
1830         snapc = img_request ? img_request->snapc : NULL;
1831         ceph_osdc_build_request(osd_req, obj_request->offset,
1832                         snapc, CEPH_NOSNAP, &mtime);
1833 }
1834
1835 /*
1836  * Create an osd request.  A read request has one osd op (read).
1837  * A write request has either one (watch) or two (hint+write) osd ops.
1838  * (All rbd data writes are prefixed with an allocation hint op, but
1839  * technically osd watch is a write request, hence this distinction.)
1840  */
1841 static struct ceph_osd_request *rbd_osd_req_create(
1842                                         struct rbd_device *rbd_dev,
1843                                         enum obj_operation_type op_type,
1844                                         unsigned int num_ops,
1845                                         struct rbd_obj_request *obj_request)
1846 {
1847         struct ceph_snap_context *snapc = NULL;
1848         struct ceph_osd_client *osdc;
1849         struct ceph_osd_request *osd_req;
1850
1851         if (obj_request_img_data_test(obj_request) && op_type == OBJ_OP_WRITE) {
1852                 struct rbd_img_request *img_request = obj_request->img_request;
1853
1854                 rbd_assert(img_request_write_test(img_request));
1855                 snapc = img_request->snapc;
1856         }
1857
1858         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1859
1860         /* Allocate and initialize the request, for the num_ops ops */
1861
1862         osdc = &rbd_dev->rbd_client->client->osdc;
1863         osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
1864                                           GFP_ATOMIC);
1865         if (!osd_req)
1866                 return NULL;    /* ENOMEM */
1867
1868         if (op_type == OBJ_OP_WRITE)
1869                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1870         else
1871                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1872
1873         osd_req->r_callback = rbd_osd_req_callback;
1874         osd_req->r_priv = obj_request;
1875
1876         osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1877         ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1878
1879         return osd_req;
1880 }
1881
1882 /*
1883  * Create a copyup osd request based on the information in the
1884  * object request supplied.  A copyup request has three osd ops,
1885  * a copyup method call, a hint op, and a write op.
1886  */
1887 static struct ceph_osd_request *
1888 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1889 {
1890         struct rbd_img_request *img_request;
1891         struct ceph_snap_context *snapc;
1892         struct rbd_device *rbd_dev;
1893         struct ceph_osd_client *osdc;
1894         struct ceph_osd_request *osd_req;
1895
1896         rbd_assert(obj_request_img_data_test(obj_request));
1897         img_request = obj_request->img_request;
1898         rbd_assert(img_request);
1899         rbd_assert(img_request_write_test(img_request));
1900
1901         /* Allocate and initialize the request, for the three ops */
1902
1903         snapc = img_request->snapc;
1904         rbd_dev = img_request->rbd_dev;
1905         osdc = &rbd_dev->rbd_client->client->osdc;
1906         osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC);
1907         if (!osd_req)
1908                 return NULL;    /* ENOMEM */
1909
1910         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1911         osd_req->r_callback = rbd_osd_req_callback;
1912         osd_req->r_priv = obj_request;
1913
1914         osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1915         ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1916
1917         return osd_req;
1918 }
1919
1920
1921 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1922 {
1923         ceph_osdc_put_request(osd_req);
1924 }
1925
1926 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1927
1928 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1929                                                 u64 offset, u64 length,
1930                                                 enum obj_request_type type)
1931 {
1932         struct rbd_obj_request *obj_request;
1933         size_t size;
1934         char *name;
1935
1936         rbd_assert(obj_request_type_valid(type));
1937
1938         size = strlen(object_name) + 1;
1939         name = kmalloc(size, GFP_KERNEL);
1940         if (!name)
1941                 return NULL;
1942
1943         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1944         if (!obj_request) {
1945                 kfree(name);
1946                 return NULL;
1947         }
1948
1949         obj_request->object_name = memcpy(name, object_name, size);
1950         obj_request->offset = offset;
1951         obj_request->length = length;
1952         obj_request->flags = 0;
1953         obj_request->which = BAD_WHICH;
1954         obj_request->type = type;
1955         INIT_LIST_HEAD(&obj_request->links);
1956         init_completion(&obj_request->completion);
1957         kref_init(&obj_request->kref);
1958
1959         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1960                 offset, length, (int)type, obj_request);
1961
1962         return obj_request;
1963 }
1964
1965 static void rbd_obj_request_destroy(struct kref *kref)
1966 {
1967         struct rbd_obj_request *obj_request;
1968
1969         obj_request = container_of(kref, struct rbd_obj_request, kref);
1970
1971         dout("%s: obj %p\n", __func__, obj_request);
1972
1973         rbd_assert(obj_request->img_request == NULL);
1974         rbd_assert(obj_request->which == BAD_WHICH);
1975
1976         if (obj_request->osd_req)
1977                 rbd_osd_req_destroy(obj_request->osd_req);
1978
1979         rbd_assert(obj_request_type_valid(obj_request->type));
1980         switch (obj_request->type) {
1981         case OBJ_REQUEST_NODATA:
1982                 break;          /* Nothing to do */
1983         case OBJ_REQUEST_BIO:
1984                 if (obj_request->bio_list)
1985                         bio_chain_put(obj_request->bio_list);
1986                 break;
1987         case OBJ_REQUEST_PAGES:
1988                 if (obj_request->pages)
1989                         ceph_release_page_vector(obj_request->pages,
1990                                                 obj_request->page_count);
1991                 break;
1992         }
1993
1994         kfree(obj_request->object_name);
1995         obj_request->object_name = NULL;
1996         kmem_cache_free(rbd_obj_request_cache, obj_request);
1997 }
1998
1999 /* It's OK to call this for a device with no parent */
2000
2001 static void rbd_spec_put(struct rbd_spec *spec);
2002 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2003 {
2004         rbd_dev_remove_parent(rbd_dev);
2005         rbd_spec_put(rbd_dev->parent_spec);
2006         rbd_dev->parent_spec = NULL;
2007         rbd_dev->parent_overlap = 0;
2008 }
2009
2010 /*
2011  * Parent image reference counting is used to determine when an
2012  * image's parent fields can be safely torn down--after there are no
2013  * more in-flight requests to the parent image.  When the last
2014  * reference is dropped, cleaning them up is safe.
2015  */
2016 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2017 {
2018         int counter;
2019
2020         if (!rbd_dev->parent_spec)
2021                 return;
2022
2023         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2024         if (counter > 0)
2025                 return;
2026
2027         /* Last reference; clean up parent data structures */
2028
2029         if (!counter)
2030                 rbd_dev_unparent(rbd_dev);
2031         else
2032                 rbd_warn(rbd_dev, "parent reference underflow");
2033 }
2034
2035 /*
2036  * If an image has a non-zero parent overlap, get a reference to its
2037  * parent.
2038  *
2039  * We must get the reference before checking for the overlap to
2040  * coordinate properly with zeroing the parent overlap in
2041  * rbd_dev_v2_parent_info() when an image gets flattened.  We
2042  * drop it again if there is no overlap.
2043  *
2044  * Returns true if the rbd device has a parent with a non-zero
2045  * overlap and a reference for it was successfully taken, or
2046  * false otherwise.
2047  */
2048 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2049 {
2050         int counter;
2051
2052         if (!rbd_dev->parent_spec)
2053                 return false;
2054
2055         counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2056         if (counter > 0 && rbd_dev->parent_overlap)
2057                 return true;
2058
2059         /* Image was flattened, but parent is not yet torn down */
2060
2061         if (counter < 0)
2062                 rbd_warn(rbd_dev, "parent reference overflow");
2063
2064         return false;
2065 }
2066
2067 /*
2068  * Caller is responsible for filling in the list of object requests
2069  * that comprises the image request, and the Linux request pointer
2070  * (if there is one).
2071  */
2072 static struct rbd_img_request *rbd_img_request_create(
2073                                         struct rbd_device *rbd_dev,
2074                                         u64 offset, u64 length,
2075                                         enum obj_operation_type op_type,
2076                                         struct ceph_snap_context *snapc)
2077 {
2078         struct rbd_img_request *img_request;
2079
2080         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2081         if (!img_request)
2082                 return NULL;
2083
2084         img_request->rq = NULL;
2085         img_request->rbd_dev = rbd_dev;
2086         img_request->offset = offset;
2087         img_request->length = length;
2088         img_request->flags = 0;
2089         if (op_type == OBJ_OP_WRITE) {
2090                 img_request_write_set(img_request);
2091                 img_request->snapc = snapc;
2092         } else {
2093                 img_request->snap_id = rbd_dev->spec->snap_id;
2094         }
2095         if (rbd_dev_parent_get(rbd_dev))
2096                 img_request_layered_set(img_request);
2097         spin_lock_init(&img_request->completion_lock);
2098         img_request->next_completion = 0;
2099         img_request->callback = NULL;
2100         img_request->result = 0;
2101         img_request->obj_request_count = 0;
2102         INIT_LIST_HEAD(&img_request->obj_requests);
2103         kref_init(&img_request->kref);
2104
2105         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2106                 obj_op_name(op_type), offset, length, img_request);
2107
2108         return img_request;
2109 }
2110
2111 static void rbd_img_request_destroy(struct kref *kref)
2112 {
2113         struct rbd_img_request *img_request;
2114         struct rbd_obj_request *obj_request;
2115         struct rbd_obj_request *next_obj_request;
2116
2117         img_request = container_of(kref, struct rbd_img_request, kref);
2118
2119         dout("%s: img %p\n", __func__, img_request);
2120
2121         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2122                 rbd_img_obj_request_del(img_request, obj_request);
2123         rbd_assert(img_request->obj_request_count == 0);
2124
2125         if (img_request_layered_test(img_request)) {
2126                 img_request_layered_clear(img_request);
2127                 rbd_dev_parent_put(img_request->rbd_dev);
2128         }
2129
2130         if (img_request_write_test(img_request))
2131                 ceph_put_snap_context(img_request->snapc);
2132
2133         kmem_cache_free(rbd_img_request_cache, img_request);
2134 }
2135
2136 static struct rbd_img_request *rbd_parent_request_create(
2137                                         struct rbd_obj_request *obj_request,
2138                                         u64 img_offset, u64 length)
2139 {
2140         struct rbd_img_request *parent_request;
2141         struct rbd_device *rbd_dev;
2142
2143         rbd_assert(obj_request->img_request);
2144         rbd_dev = obj_request->img_request->rbd_dev;
2145
2146         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2147                                                 length, OBJ_OP_READ, NULL);
2148         if (!parent_request)
2149                 return NULL;
2150
2151         img_request_child_set(parent_request);
2152         rbd_obj_request_get(obj_request);
2153         parent_request->obj_request = obj_request;
2154
2155         return parent_request;
2156 }
2157
2158 static void rbd_parent_request_destroy(struct kref *kref)
2159 {
2160         struct rbd_img_request *parent_request;
2161         struct rbd_obj_request *orig_request;
2162
2163         parent_request = container_of(kref, struct rbd_img_request, kref);
2164         orig_request = parent_request->obj_request;
2165
2166         parent_request->obj_request = NULL;
2167         rbd_obj_request_put(orig_request);
2168         img_request_child_clear(parent_request);
2169
2170         rbd_img_request_destroy(kref);
2171 }
2172
2173 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2174 {
2175         struct rbd_img_request *img_request;
2176         unsigned int xferred;
2177         int result;
2178         bool more;
2179
2180         rbd_assert(obj_request_img_data_test(obj_request));
2181         img_request = obj_request->img_request;
2182
2183         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2184         xferred = (unsigned int)obj_request->xferred;
2185         result = obj_request->result;
2186         if (result) {
2187                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2188                 enum obj_operation_type op_type;
2189
2190                 op_type = img_request_write_test(img_request) ? OBJ_OP_WRITE :
2191                                                                 OBJ_OP_READ;
2192
2193                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2194                         obj_op_name(op_type), obj_request->length,
2195                         obj_request->img_offset, obj_request->offset);
2196                 rbd_warn(rbd_dev, "  result %d xferred %x",
2197                         result, xferred);
2198                 if (!img_request->result)
2199                         img_request->result = result;
2200         }
2201
2202         /* Image object requests don't own their page array */
2203
2204         if (obj_request->type == OBJ_REQUEST_PAGES) {
2205                 obj_request->pages = NULL;
2206                 obj_request->page_count = 0;
2207         }
2208
2209         if (img_request_child_test(img_request)) {
2210                 rbd_assert(img_request->obj_request != NULL);
2211                 more = obj_request->which < img_request->obj_request_count - 1;
2212         } else {
2213                 rbd_assert(img_request->rq != NULL);
2214                 more = blk_end_request(img_request->rq, result, xferred);
2215         }
2216
2217         return more;
2218 }
2219
2220 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2221 {
2222         struct rbd_img_request *img_request;
2223         u32 which = obj_request->which;
2224         bool more = true;
2225
2226         rbd_assert(obj_request_img_data_test(obj_request));
2227         img_request = obj_request->img_request;
2228
2229         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2230         rbd_assert(img_request != NULL);
2231         rbd_assert(img_request->obj_request_count > 0);
2232         rbd_assert(which != BAD_WHICH);
2233         rbd_assert(which < img_request->obj_request_count);
2234
2235         spin_lock_irq(&img_request->completion_lock);
2236         if (which != img_request->next_completion)
2237                 goto out;
2238
2239         for_each_obj_request_from(img_request, obj_request) {
2240                 rbd_assert(more);
2241                 rbd_assert(which < img_request->obj_request_count);
2242
2243                 if (!obj_request_done_test(obj_request))
2244                         break;
2245                 more = rbd_img_obj_end_request(obj_request);
2246                 which++;
2247         }
2248
2249         rbd_assert(more ^ (which == img_request->obj_request_count));
2250         img_request->next_completion = which;
2251 out:
2252         spin_unlock_irq(&img_request->completion_lock);
2253         rbd_img_request_put(img_request);
2254
2255         if (!more)
2256                 rbd_img_request_complete(img_request);
2257 }
2258
2259 /*
2260  * Split up an image request into one or more object requests, each
2261  * to a different object.  The "type" parameter indicates whether
2262  * "data_desc" is the pointer to the head of a list of bio
2263  * structures, or the base of a page array.  In either case this
2264  * function assumes data_desc describes memory sufficient to hold
2265  * all data described by the image request.
2266  */
2267 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2268                                         enum obj_request_type type,
2269                                         void *data_desc)
2270 {
2271         struct rbd_device *rbd_dev = img_request->rbd_dev;
2272         struct rbd_obj_request *obj_request = NULL;
2273         struct rbd_obj_request *next_obj_request;
2274         struct bio *bio_list = NULL;
2275         unsigned int bio_offset = 0;
2276         struct page **pages = NULL;
2277         enum obj_operation_type op_type;
2278         u64 img_offset;
2279         u64 resid;
2280         u16 opcode;
2281
2282         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2283                 (int)type, data_desc);
2284
2285         img_offset = img_request->offset;
2286         resid = img_request->length;
2287         rbd_assert(resid > 0);
2288
2289         if (type == OBJ_REQUEST_BIO) {
2290                 bio_list = data_desc;
2291                 rbd_assert(img_offset ==
2292                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2293         } else {
2294                 rbd_assert(type == OBJ_REQUEST_PAGES);
2295                 pages = data_desc;
2296         }
2297
2298         while (resid) {
2299                 struct ceph_osd_request *osd_req;
2300                 const char *object_name;
2301                 u64 offset;
2302                 u64 length;
2303                 unsigned int which = 0;
2304
2305                 object_name = rbd_segment_name(rbd_dev, img_offset);
2306                 if (!object_name)
2307                         goto out_unwind;
2308                 offset = rbd_segment_offset(rbd_dev, img_offset);
2309                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2310                 obj_request = rbd_obj_request_create(object_name,
2311                                                 offset, length, type);
2312                 /* object request has its own copy of the object name */
2313                 rbd_segment_name_free(object_name);
2314                 if (!obj_request)
2315                         goto out_unwind;
2316
2317                 /*
2318                  * set obj_request->img_request before creating the
2319                  * osd_request so that it gets the right snapc
2320                  */
2321                 rbd_img_obj_request_add(img_request, obj_request);
2322
2323                 if (type == OBJ_REQUEST_BIO) {
2324                         unsigned int clone_size;
2325
2326                         rbd_assert(length <= (u64)UINT_MAX);
2327                         clone_size = (unsigned int)length;
2328                         obj_request->bio_list =
2329                                         bio_chain_clone_range(&bio_list,
2330                                                                 &bio_offset,
2331                                                                 clone_size,
2332                                                                 GFP_ATOMIC);
2333                         if (!obj_request->bio_list)
2334                                 goto out_unwind;
2335                 } else {
2336                         unsigned int page_count;
2337
2338                         obj_request->pages = pages;
2339                         page_count = (u32)calc_pages_for(offset, length);
2340                         obj_request->page_count = page_count;
2341                         if ((offset + length) & ~PAGE_MASK)
2342                                 page_count--;   /* more on last page */
2343                         pages += page_count;
2344                 }
2345
2346                 if (img_request_write_test(img_request)) {
2347                         op_type = OBJ_OP_WRITE;
2348                         opcode = CEPH_OSD_OP_WRITE;
2349                 } else {
2350                         op_type = OBJ_OP_READ;
2351                         opcode = CEPH_OSD_OP_READ;
2352                 }
2353
2354                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2355                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2356                                         obj_request);
2357                 if (!osd_req)
2358                         goto out_unwind;
2359                 obj_request->osd_req = osd_req;
2360                 obj_request->callback = rbd_img_obj_callback;
2361                 rbd_img_request_get(img_request);
2362
2363                 if (op_type == OBJ_OP_WRITE) {
2364                         osd_req_op_alloc_hint_init(osd_req, which,
2365                                              rbd_obj_bytes(&rbd_dev->header),
2366                                              rbd_obj_bytes(&rbd_dev->header));
2367                         which++;
2368                 }
2369
2370                 osd_req_op_extent_init(osd_req, which, opcode, offset, length,
2371                                        0, 0);
2372                 if (type == OBJ_REQUEST_BIO)
2373                         osd_req_op_extent_osd_data_bio(osd_req, which,
2374                                         obj_request->bio_list, length);
2375                 else
2376                         osd_req_op_extent_osd_data_pages(osd_req, which,
2377                                         obj_request->pages, length,
2378                                         offset & ~PAGE_MASK, false, false);
2379
2380                 if (op_type == OBJ_OP_WRITE)
2381                         rbd_osd_req_format_write(obj_request);
2382                 else
2383                         rbd_osd_req_format_read(obj_request);
2384
2385                 obj_request->img_offset = img_offset;
2386
2387                 img_offset += length;
2388                 resid -= length;
2389         }
2390
2391         return 0;
2392
2393 out_unwind:
2394         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2395                 rbd_img_obj_request_del(img_request, obj_request);
2396
2397         return -ENOMEM;
2398 }
2399
2400 static void
2401 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2402 {
2403         struct rbd_img_request *img_request;
2404         struct rbd_device *rbd_dev;
2405         struct page **pages;
2406         u32 page_count;
2407
2408         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2409         rbd_assert(obj_request_img_data_test(obj_request));
2410         img_request = obj_request->img_request;
2411         rbd_assert(img_request);
2412
2413         rbd_dev = img_request->rbd_dev;
2414         rbd_assert(rbd_dev);
2415
2416         pages = obj_request->copyup_pages;
2417         rbd_assert(pages != NULL);
2418         obj_request->copyup_pages = NULL;
2419         page_count = obj_request->copyup_page_count;
2420         rbd_assert(page_count);
2421         obj_request->copyup_page_count = 0;
2422         ceph_release_page_vector(pages, page_count);
2423
2424         /*
2425          * We want the transfer count to reflect the size of the
2426          * original write request.  There is no such thing as a
2427          * successful short write, so if the request was successful
2428          * we can just set it to the originally-requested length.
2429          */
2430         if (!obj_request->result)
2431                 obj_request->xferred = obj_request->length;
2432
2433         /* Finish up with the normal image object callback */
2434
2435         rbd_img_obj_callback(obj_request);
2436 }
2437
2438 static void
2439 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2440 {
2441         struct rbd_obj_request *orig_request;
2442         struct ceph_osd_request *osd_req;
2443         struct ceph_osd_client *osdc;
2444         struct rbd_device *rbd_dev;
2445         struct page **pages;
2446         u32 page_count;
2447         int img_result;
2448         u64 parent_length;
2449         u64 offset;
2450         u64 length;
2451
2452         rbd_assert(img_request_child_test(img_request));
2453
2454         /* First get what we need from the image request */
2455
2456         pages = img_request->copyup_pages;
2457         rbd_assert(pages != NULL);
2458         img_request->copyup_pages = NULL;
2459         page_count = img_request->copyup_page_count;
2460         rbd_assert(page_count);
2461         img_request->copyup_page_count = 0;
2462
2463         orig_request = img_request->obj_request;
2464         rbd_assert(orig_request != NULL);
2465         rbd_assert(obj_request_type_valid(orig_request->type));
2466         img_result = img_request->result;
2467         parent_length = img_request->length;
2468         rbd_assert(parent_length == img_request->xferred);
2469         rbd_img_request_put(img_request);
2470
2471         rbd_assert(orig_request->img_request);
2472         rbd_dev = orig_request->img_request->rbd_dev;
2473         rbd_assert(rbd_dev);
2474
2475         /*
2476          * If the overlap has become 0 (most likely because the
2477          * image has been flattened) we need to free the pages
2478          * and re-submit the original write request.
2479          */
2480         if (!rbd_dev->parent_overlap) {
2481                 struct ceph_osd_client *osdc;
2482
2483                 ceph_release_page_vector(pages, page_count);
2484                 osdc = &rbd_dev->rbd_client->client->osdc;
2485                 img_result = rbd_obj_request_submit(osdc, orig_request);
2486                 if (!img_result)
2487                         return;
2488         }
2489
2490         if (img_result)
2491                 goto out_err;
2492
2493         /*
2494          * The original osd request is of no use to use any more.
2495          * We need a new one that can hold the three ops in a copyup
2496          * request.  Allocate the new copyup osd request for the
2497          * original request, and release the old one.
2498          */
2499         img_result = -ENOMEM;
2500         osd_req = rbd_osd_req_create_copyup(orig_request);
2501         if (!osd_req)
2502                 goto out_err;
2503         rbd_osd_req_destroy(orig_request->osd_req);
2504         orig_request->osd_req = osd_req;
2505         orig_request->copyup_pages = pages;
2506         orig_request->copyup_page_count = page_count;
2507
2508         /* Initialize the copyup op */
2509
2510         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2511         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2512                                                 false, false);
2513
2514         /* Then the hint op */
2515
2516         osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header),
2517                                    rbd_obj_bytes(&rbd_dev->header));
2518
2519         /* And the original write request op */
2520
2521         offset = orig_request->offset;
2522         length = orig_request->length;
2523         osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE,
2524                                         offset, length, 0, 0);
2525         if (orig_request->type == OBJ_REQUEST_BIO)
2526                 osd_req_op_extent_osd_data_bio(osd_req, 2,
2527                                         orig_request->bio_list, length);
2528         else
2529                 osd_req_op_extent_osd_data_pages(osd_req, 2,
2530                                         orig_request->pages, length,
2531                                         offset & ~PAGE_MASK, false, false);
2532
2533         rbd_osd_req_format_write(orig_request);
2534
2535         /* All set, send it off. */
2536
2537         orig_request->callback = rbd_img_obj_copyup_callback;
2538         osdc = &rbd_dev->rbd_client->client->osdc;
2539         img_result = rbd_obj_request_submit(osdc, orig_request);
2540         if (!img_result)
2541                 return;
2542 out_err:
2543         /* Record the error code and complete the request */
2544
2545         orig_request->result = img_result;
2546         orig_request->xferred = 0;
2547         obj_request_done_set(orig_request);
2548         rbd_obj_request_complete(orig_request);
2549 }
2550
2551 /*
2552  * Read from the parent image the range of data that covers the
2553  * entire target of the given object request.  This is used for
2554  * satisfying a layered image write request when the target of an
2555  * object request from the image request does not exist.
2556  *
2557  * A page array big enough to hold the returned data is allocated
2558  * and supplied to rbd_img_request_fill() as the "data descriptor."
2559  * When the read completes, this page array will be transferred to
2560  * the original object request for the copyup operation.
2561  *
2562  * If an error occurs, record it as the result of the original
2563  * object request and mark it done so it gets completed.
2564  */
2565 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2566 {
2567         struct rbd_img_request *img_request = NULL;
2568         struct rbd_img_request *parent_request = NULL;
2569         struct rbd_device *rbd_dev;
2570         u64 img_offset;
2571         u64 length;
2572         struct page **pages = NULL;
2573         u32 page_count;
2574         int result;
2575
2576         rbd_assert(obj_request_img_data_test(obj_request));
2577         rbd_assert(obj_request_type_valid(obj_request->type));
2578
2579         img_request = obj_request->img_request;
2580         rbd_assert(img_request != NULL);
2581         rbd_dev = img_request->rbd_dev;
2582         rbd_assert(rbd_dev->parent != NULL);
2583
2584         /*
2585          * Determine the byte range covered by the object in the
2586          * child image to which the original request was to be sent.
2587          */
2588         img_offset = obj_request->img_offset - obj_request->offset;
2589         length = (u64)1 << rbd_dev->header.obj_order;
2590
2591         /*
2592          * There is no defined parent data beyond the parent
2593          * overlap, so limit what we read at that boundary if
2594          * necessary.
2595          */
2596         if (img_offset + length > rbd_dev->parent_overlap) {
2597                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2598                 length = rbd_dev->parent_overlap - img_offset;
2599         }
2600
2601         /*
2602          * Allocate a page array big enough to receive the data read
2603          * from the parent.
2604          */
2605         page_count = (u32)calc_pages_for(0, length);
2606         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2607         if (IS_ERR(pages)) {
2608                 result = PTR_ERR(pages);
2609                 pages = NULL;
2610                 goto out_err;
2611         }
2612
2613         result = -ENOMEM;
2614         parent_request = rbd_parent_request_create(obj_request,
2615                                                 img_offset, length);
2616         if (!parent_request)
2617                 goto out_err;
2618
2619         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2620         if (result)
2621                 goto out_err;
2622         parent_request->copyup_pages = pages;
2623         parent_request->copyup_page_count = page_count;
2624
2625         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2626         result = rbd_img_request_submit(parent_request);
2627         if (!result)
2628                 return 0;
2629
2630         parent_request->copyup_pages = NULL;
2631         parent_request->copyup_page_count = 0;
2632         parent_request->obj_request = NULL;
2633         rbd_obj_request_put(obj_request);
2634 out_err:
2635         if (pages)
2636                 ceph_release_page_vector(pages, page_count);
2637         if (parent_request)
2638                 rbd_img_request_put(parent_request);
2639         obj_request->result = result;
2640         obj_request->xferred = 0;
2641         obj_request_done_set(obj_request);
2642
2643         return result;
2644 }
2645
2646 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2647 {
2648         struct rbd_obj_request *orig_request;
2649         struct rbd_device *rbd_dev;
2650         int result;
2651
2652         rbd_assert(!obj_request_img_data_test(obj_request));
2653
2654         /*
2655          * All we need from the object request is the original
2656          * request and the result of the STAT op.  Grab those, then
2657          * we're done with the request.
2658          */
2659         orig_request = obj_request->obj_request;
2660         obj_request->obj_request = NULL;
2661         rbd_obj_request_put(orig_request);
2662         rbd_assert(orig_request);
2663         rbd_assert(orig_request->img_request);
2664
2665         result = obj_request->result;
2666         obj_request->result = 0;
2667
2668         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2669                 obj_request, orig_request, result,
2670                 obj_request->xferred, obj_request->length);
2671         rbd_obj_request_put(obj_request);
2672
2673         /*
2674          * If the overlap has become 0 (most likely because the
2675          * image has been flattened) we need to free the pages
2676          * and re-submit the original write request.
2677          */
2678         rbd_dev = orig_request->img_request->rbd_dev;
2679         if (!rbd_dev->parent_overlap) {
2680                 struct ceph_osd_client *osdc;
2681
2682                 osdc = &rbd_dev->rbd_client->client->osdc;
2683                 result = rbd_obj_request_submit(osdc, orig_request);
2684                 if (!result)
2685                         return;
2686         }
2687
2688         /*
2689          * Our only purpose here is to determine whether the object
2690          * exists, and we don't want to treat the non-existence as
2691          * an error.  If something else comes back, transfer the
2692          * error to the original request and complete it now.
2693          */
2694         if (!result) {
2695                 obj_request_existence_set(orig_request, true);
2696         } else if (result == -ENOENT) {
2697                 obj_request_existence_set(orig_request, false);
2698         } else if (result) {
2699                 orig_request->result = result;
2700                 goto out;
2701         }
2702
2703         /*
2704          * Resubmit the original request now that we have recorded
2705          * whether the target object exists.
2706          */
2707         orig_request->result = rbd_img_obj_request_submit(orig_request);
2708 out:
2709         if (orig_request->result)
2710                 rbd_obj_request_complete(orig_request);
2711 }
2712
2713 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2714 {
2715         struct rbd_obj_request *stat_request;
2716         struct rbd_device *rbd_dev;
2717         struct ceph_osd_client *osdc;
2718         struct page **pages = NULL;
2719         u32 page_count;
2720         size_t size;
2721         int ret;
2722
2723         /*
2724          * The response data for a STAT call consists of:
2725          *     le64 length;
2726          *     struct {
2727          *         le32 tv_sec;
2728          *         le32 tv_nsec;
2729          *     } mtime;
2730          */
2731         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2732         page_count = (u32)calc_pages_for(0, size);
2733         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2734         if (IS_ERR(pages))
2735                 return PTR_ERR(pages);
2736
2737         ret = -ENOMEM;
2738         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2739                                                         OBJ_REQUEST_PAGES);
2740         if (!stat_request)
2741                 goto out;
2742
2743         rbd_obj_request_get(obj_request);
2744         stat_request->obj_request = obj_request;
2745         stat_request->pages = pages;
2746         stat_request->page_count = page_count;
2747
2748         rbd_assert(obj_request->img_request);
2749         rbd_dev = obj_request->img_request->rbd_dev;
2750         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2751                                                    stat_request);
2752         if (!stat_request->osd_req)
2753                 goto out;
2754         stat_request->callback = rbd_img_obj_exists_callback;
2755
2756         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2757         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2758                                         false, false);
2759         rbd_osd_req_format_read(stat_request);
2760
2761         osdc = &rbd_dev->rbd_client->client->osdc;
2762         ret = rbd_obj_request_submit(osdc, stat_request);
2763 out:
2764         if (ret)
2765                 rbd_obj_request_put(obj_request);
2766
2767         return ret;
2768 }
2769
2770 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2771 {
2772         struct rbd_img_request *img_request;
2773         struct rbd_device *rbd_dev;
2774
2775         rbd_assert(obj_request_img_data_test(obj_request));
2776
2777         img_request = obj_request->img_request;
2778         rbd_assert(img_request);
2779         rbd_dev = img_request->rbd_dev;
2780
2781         /* Reads */
2782         if (!img_request_write_test(img_request))
2783                 return true;
2784
2785         /* Non-layered writes */
2786         if (!img_request_layered_test(img_request))
2787                 return true;
2788
2789         /*
2790          * Layered writes outside of the parent overlap range don't
2791          * share any data with the parent.
2792          */
2793         if (!obj_request_overlaps_parent(obj_request))
2794                 return true;
2795
2796         /*
2797          * Entire-object layered writes - we will overwrite whatever
2798          * parent data there is anyway.
2799          */
2800         if (!obj_request->offset &&
2801             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2802                 return true;
2803
2804         /*
2805          * If the object is known to already exist, its parent data has
2806          * already been copied.
2807          */
2808         if (obj_request_known_test(obj_request) &&
2809             obj_request_exists_test(obj_request))
2810                 return true;
2811
2812         return false;
2813 }
2814
2815 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2816 {
2817         if (img_obj_request_simple(obj_request)) {
2818                 struct rbd_device *rbd_dev;
2819                 struct ceph_osd_client *osdc;
2820
2821                 rbd_dev = obj_request->img_request->rbd_dev;
2822                 osdc = &rbd_dev->rbd_client->client->osdc;
2823
2824                 return rbd_obj_request_submit(osdc, obj_request);
2825         }
2826
2827         /*
2828          * It's a layered write.  The target object might exist but
2829          * we may not know that yet.  If we know it doesn't exist,
2830          * start by reading the data for the full target object from
2831          * the parent so we can use it for a copyup to the target.
2832          */
2833         if (obj_request_known_test(obj_request))
2834                 return rbd_img_obj_parent_read_full(obj_request);
2835
2836         /* We don't know whether the target exists.  Go find out. */
2837
2838         return rbd_img_obj_exists_submit(obj_request);
2839 }
2840
2841 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2842 {
2843         struct rbd_obj_request *obj_request;
2844         struct rbd_obj_request *next_obj_request;
2845
2846         dout("%s: img %p\n", __func__, img_request);
2847         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2848                 int ret;
2849
2850                 ret = rbd_img_obj_request_submit(obj_request);
2851                 if (ret)
2852                         return ret;
2853         }
2854
2855         return 0;
2856 }
2857
2858 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2859 {
2860         struct rbd_obj_request *obj_request;
2861         struct rbd_device *rbd_dev;
2862         u64 obj_end;
2863         u64 img_xferred;
2864         int img_result;
2865
2866         rbd_assert(img_request_child_test(img_request));
2867
2868         /* First get what we need from the image request and release it */
2869
2870         obj_request = img_request->obj_request;
2871         img_xferred = img_request->xferred;
2872         img_result = img_request->result;
2873         rbd_img_request_put(img_request);
2874
2875         /*
2876          * If the overlap has become 0 (most likely because the
2877          * image has been flattened) we need to re-submit the
2878          * original request.
2879          */
2880         rbd_assert(obj_request);
2881         rbd_assert(obj_request->img_request);
2882         rbd_dev = obj_request->img_request->rbd_dev;
2883         if (!rbd_dev->parent_overlap) {
2884                 struct ceph_osd_client *osdc;
2885
2886                 osdc = &rbd_dev->rbd_client->client->osdc;
2887                 img_result = rbd_obj_request_submit(osdc, obj_request);
2888                 if (!img_result)
2889                         return;
2890         }
2891
2892         obj_request->result = img_result;
2893         if (obj_request->result)
2894                 goto out;
2895
2896         /*
2897          * We need to zero anything beyond the parent overlap
2898          * boundary.  Since rbd_img_obj_request_read_callback()
2899          * will zero anything beyond the end of a short read, an
2900          * easy way to do this is to pretend the data from the
2901          * parent came up short--ending at the overlap boundary.
2902          */
2903         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2904         obj_end = obj_request->img_offset + obj_request->length;
2905         if (obj_end > rbd_dev->parent_overlap) {
2906                 u64 xferred = 0;
2907
2908                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2909                         xferred = rbd_dev->parent_overlap -
2910                                         obj_request->img_offset;
2911
2912                 obj_request->xferred = min(img_xferred, xferred);
2913         } else {
2914                 obj_request->xferred = img_xferred;
2915         }
2916 out:
2917         rbd_img_obj_request_read_callback(obj_request);
2918         rbd_obj_request_complete(obj_request);
2919 }
2920
2921 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2922 {
2923         struct rbd_img_request *img_request;
2924         int result;
2925
2926         rbd_assert(obj_request_img_data_test(obj_request));
2927         rbd_assert(obj_request->img_request != NULL);
2928         rbd_assert(obj_request->result == (s32) -ENOENT);
2929         rbd_assert(obj_request_type_valid(obj_request->type));
2930
2931         /* rbd_read_finish(obj_request, obj_request->length); */
2932         img_request = rbd_parent_request_create(obj_request,
2933                                                 obj_request->img_offset,
2934                                                 obj_request->length);
2935         result = -ENOMEM;
2936         if (!img_request)
2937                 goto out_err;
2938
2939         if (obj_request->type == OBJ_REQUEST_BIO)
2940                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2941                                                 obj_request->bio_list);
2942         else
2943                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2944                                                 obj_request->pages);
2945         if (result)
2946                 goto out_err;
2947
2948         img_request->callback = rbd_img_parent_read_callback;
2949         result = rbd_img_request_submit(img_request);
2950         if (result)
2951                 goto out_err;
2952
2953         return;
2954 out_err:
2955         if (img_request)
2956                 rbd_img_request_put(img_request);
2957         obj_request->result = result;
2958         obj_request->xferred = 0;
2959         obj_request_done_set(obj_request);
2960 }
2961
2962 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2963 {
2964         struct rbd_obj_request *obj_request;
2965         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2966         int ret;
2967
2968         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2969                                                         OBJ_REQUEST_NODATA);
2970         if (!obj_request)
2971                 return -ENOMEM;
2972
2973         ret = -ENOMEM;
2974         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2975                                                   obj_request);
2976         if (!obj_request->osd_req)
2977                 goto out;
2978
2979         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2980                                         notify_id, 0, 0);
2981         rbd_osd_req_format_read(obj_request);
2982
2983         ret = rbd_obj_request_submit(osdc, obj_request);
2984         if (ret)
2985                 goto out;
2986         ret = rbd_obj_request_wait(obj_request);
2987 out:
2988         rbd_obj_request_put(obj_request);
2989
2990         return ret;
2991 }
2992
2993 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2994 {
2995         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2996         int ret;
2997
2998         if (!rbd_dev)
2999                 return;
3000
3001         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
3002                 rbd_dev->header_name, (unsigned long long)notify_id,
3003                 (unsigned int)opcode);
3004
3005         /*
3006          * Until adequate refresh error handling is in place, there is
3007          * not much we can do here, except warn.
3008          *
3009          * See http://tracker.ceph.com/issues/5040
3010          */
3011         ret = rbd_dev_refresh(rbd_dev);
3012         if (ret)
3013                 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3014
3015         ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
3016         if (ret)
3017                 rbd_warn(rbd_dev, "notify_ack ret %d", ret);
3018 }
3019
3020 /*
3021  * Send a (un)watch request and wait for the ack.  Return a request
3022  * with a ref held on success or error.
3023  */
3024 static struct rbd_obj_request *rbd_obj_watch_request_helper(
3025                                                 struct rbd_device *rbd_dev,
3026                                                 bool watch)
3027 {
3028         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3029         struct rbd_obj_request *obj_request;
3030         int ret;
3031
3032         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3033                                              OBJ_REQUEST_NODATA);
3034         if (!obj_request)
3035                 return ERR_PTR(-ENOMEM);
3036
3037         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
3038                                                   obj_request);
3039         if (!obj_request->osd_req) {
3040                 ret = -ENOMEM;
3041                 goto out;
3042         }
3043
3044         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
3045                               rbd_dev->watch_event->cookie, 0, watch);
3046         rbd_osd_req_format_write(obj_request);
3047
3048         if (watch)
3049                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
3050
3051         ret = rbd_obj_request_submit(osdc, obj_request);
3052         if (ret)
3053                 goto out;
3054
3055         ret = rbd_obj_request_wait(obj_request);
3056         if (ret)
3057                 goto out;
3058
3059         ret = obj_request->result;
3060         if (ret) {
3061                 if (watch)
3062                         rbd_obj_request_end(obj_request);
3063                 goto out;
3064         }
3065
3066         return obj_request;
3067
3068 out:
3069         rbd_obj_request_put(obj_request);
3070         return ERR_PTR(ret);
3071 }
3072
3073 /*
3074  * Initiate a watch request, synchronously.
3075  */
3076 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3077 {
3078         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3079         struct rbd_obj_request *obj_request;
3080         int ret;
3081
3082         rbd_assert(!rbd_dev->watch_event);
3083         rbd_assert(!rbd_dev->watch_request);
3084
3085         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
3086                                      &rbd_dev->watch_event);
3087         if (ret < 0)
3088                 return ret;
3089
3090         obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
3091         if (IS_ERR(obj_request)) {
3092                 ceph_osdc_cancel_event(rbd_dev->watch_event);
3093                 rbd_dev->watch_event = NULL;
3094                 return PTR_ERR(obj_request);
3095         }
3096
3097         /*
3098          * A watch request is set to linger, so the underlying osd
3099          * request won't go away until we unregister it.  We retain
3100          * a pointer to the object request during that time (in
3101          * rbd_dev->watch_request), so we'll keep a reference to it.
3102          * We'll drop that reference after we've unregistered it in
3103          * rbd_dev_header_unwatch_sync().
3104          */
3105         rbd_dev->watch_request = obj_request;
3106
3107         return 0;
3108 }
3109
3110 /*
3111  * Tear down a watch request, synchronously.
3112  */
3113 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3114 {
3115         struct rbd_obj_request *obj_request;
3116
3117         rbd_assert(rbd_dev->watch_event);
3118         rbd_assert(rbd_dev->watch_request);
3119
3120         rbd_obj_request_end(rbd_dev->watch_request);
3121         rbd_obj_request_put(rbd_dev->watch_request);
3122         rbd_dev->watch_request = NULL;
3123
3124         obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
3125         if (!IS_ERR(obj_request))
3126                 rbd_obj_request_put(obj_request);
3127         else
3128                 rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
3129                          PTR_ERR(obj_request));
3130
3131         ceph_osdc_cancel_event(rbd_dev->watch_event);
3132         rbd_dev->watch_event = NULL;
3133 }
3134
3135 /*
3136  * Synchronous osd object method call.  Returns the number of bytes
3137  * returned in the outbound buffer, or a negative error code.
3138  */
3139 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3140                              const char *object_name,
3141                              const char *class_name,
3142                              const char *method_name,
3143                              const void *outbound,
3144                              size_t outbound_size,
3145                              void *inbound,
3146                              size_t inbound_size)
3147 {
3148         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3149         struct rbd_obj_request *obj_request;
3150         struct page **pages;
3151         u32 page_count;
3152         int ret;
3153
3154         /*
3155          * Method calls are ultimately read operations.  The result
3156          * should placed into the inbound buffer provided.  They
3157          * also supply outbound data--parameters for the object
3158          * method.  Currently if this is present it will be a
3159          * snapshot id.
3160          */
3161         page_count = (u32)calc_pages_for(0, inbound_size);
3162         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3163         if (IS_ERR(pages))
3164                 return PTR_ERR(pages);
3165
3166         ret = -ENOMEM;
3167         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
3168                                                         OBJ_REQUEST_PAGES);
3169         if (!obj_request)
3170                 goto out;
3171
3172         obj_request->pages = pages;
3173         obj_request->page_count = page_count;
3174
3175         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3176                                                   obj_request);
3177         if (!obj_request->osd_req)
3178                 goto out;
3179
3180         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
3181                                         class_name, method_name);
3182         if (outbound_size) {
3183                 struct ceph_pagelist *pagelist;
3184
3185                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3186                 if (!pagelist)
3187                         goto out;
3188
3189                 ceph_pagelist_init(pagelist);
3190                 ceph_pagelist_append(pagelist, outbound, outbound_size);
3191                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3192                                                 pagelist);
3193         }
3194         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3195                                         obj_request->pages, inbound_size,
3196                                         0, false, false);
3197         rbd_osd_req_format_read(obj_request);
3198
3199         ret = rbd_obj_request_submit(osdc, obj_request);
3200         if (ret)
3201                 goto out;
3202         ret = rbd_obj_request_wait(obj_request);
3203         if (ret)
3204                 goto out;
3205
3206         ret = obj_request->result;
3207         if (ret < 0)
3208                 goto out;
3209
3210         rbd_assert(obj_request->xferred < (u64)INT_MAX);
3211         ret = (int)obj_request->xferred;
3212         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3213 out:
3214         if (obj_request)
3215                 rbd_obj_request_put(obj_request);
3216         else
3217                 ceph_release_page_vector(pages, page_count);
3218
3219         return ret;
3220 }
3221
3222 static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3223 {
3224         struct rbd_img_request *img_request;
3225         struct ceph_snap_context *snapc = NULL;
3226         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3227         u64 length = blk_rq_bytes(rq);
3228         enum obj_operation_type op_type;
3229         u64 mapping_size;
3230         int result;
3231
3232         if (rq->cmd_flags & REQ_WRITE)
3233                 op_type = OBJ_OP_WRITE;
3234         else
3235                 op_type = OBJ_OP_READ;
3236
3237         /* Ignore/skip any zero-length requests */
3238
3239         if (!length) {
3240                 dout("%s: zero-length request\n", __func__);
3241                 result = 0;
3242                 goto err_rq;
3243         }
3244
3245         /* Only reads are allowed to a read-only device */
3246
3247         if (op_type != OBJ_OP_READ) {
3248                 if (rbd_dev->mapping.read_only) {
3249                         result = -EROFS;
3250                         goto err_rq;
3251                 }
3252                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3253         }
3254
3255         /*
3256          * Quit early if the mapped snapshot no longer exists.  It's
3257          * still possible the snapshot will have disappeared by the
3258          * time our request arrives at the osd, but there's no sense in
3259          * sending it if we already know.
3260          */
3261         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3262                 dout("request for non-existent snapshot");
3263                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3264                 result = -ENXIO;
3265                 goto err_rq;
3266         }
3267
3268         if (offset && length > U64_MAX - offset + 1) {
3269                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3270                          length);
3271                 result = -EINVAL;
3272                 goto err_rq;    /* Shouldn't happen */
3273         }
3274
3275         down_read(&rbd_dev->header_rwsem);
3276         mapping_size = rbd_dev->mapping.size;
3277         if (op_type != OBJ_OP_READ) {
3278                 snapc = rbd_dev->header.snapc;
3279                 ceph_get_snap_context(snapc);
3280         }
3281         up_read(&rbd_dev->header_rwsem);
3282
3283         if (offset + length > mapping_size) {
3284                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3285                          length, mapping_size);
3286                 result = -EIO;
3287                 goto err_rq;
3288         }
3289
3290         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
3291                                              snapc);
3292         if (!img_request) {
3293                 result = -ENOMEM;
3294                 goto err_rq;
3295         }
3296         img_request->rq = rq;
3297
3298         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
3299         if (result)
3300                 goto err_img_request;
3301
3302         result = rbd_img_request_submit(img_request);
3303         if (result)
3304                 goto err_img_request;
3305
3306         return;
3307
3308 err_img_request:
3309         rbd_img_request_put(img_request);
3310 err_rq:
3311         if (result)
3312                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3313                          obj_op_name(op_type), length, offset, result);
3314         if (snapc)
3315                 ceph_put_snap_context(snapc);
3316         blk_end_request_all(rq, result);
3317 }
3318
3319 static void rbd_request_workfn(struct work_struct *work)
3320 {
3321         struct rbd_device *rbd_dev =
3322             container_of(work, struct rbd_device, rq_work);
3323         struct request *rq, *next;
3324         LIST_HEAD(requests);
3325
3326         spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3327         list_splice_init(&rbd_dev->rq_queue, &requests);
3328         spin_unlock_irq(&rbd_dev->lock);
3329
3330         list_for_each_entry_safe(rq, next, &requests, queuelist) {
3331                 list_del_init(&rq->queuelist);
3332                 rbd_handle_request(rbd_dev, rq);
3333         }
3334 }
3335
3336 /*
3337  * Called with q->queue_lock held and interrupts disabled, possibly on
3338  * the way to schedule().  Do not sleep here!
3339  */
3340 static void rbd_request_fn(struct request_queue *q)
3341 {
3342         struct rbd_device *rbd_dev = q->queuedata;
3343         struct request *rq;
3344         int queued = 0;
3345
3346         rbd_assert(rbd_dev);
3347
3348         while ((rq = blk_fetch_request(q))) {
3349                 /* Ignore any non-FS requests that filter through. */
3350                 if (rq->cmd_type != REQ_TYPE_FS) {
3351                         dout("%s: non-fs request type %d\n", __func__,
3352                                 (int) rq->cmd_type);
3353                         __blk_end_request_all(rq, 0);
3354                         continue;
3355                 }
3356
3357                 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3358                 queued++;
3359         }
3360
3361         if (queued)
3362                 queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
3363 }
3364
3365 /*
3366  * a queue callback. Makes sure that we don't create a bio that spans across
3367  * multiple osd objects. One exception would be with a single page bios,
3368  * which we handle later at bio_chain_clone_range()
3369  */
3370 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3371                           struct bio_vec *bvec)
3372 {
3373         struct rbd_device *rbd_dev = q->queuedata;
3374         sector_t sector_offset;
3375         sector_t sectors_per_obj;
3376         sector_t obj_sector_offset;
3377         int ret;
3378
3379         /*
3380          * Find how far into its rbd object the partition-relative
3381          * bio start sector is to offset relative to the enclosing
3382          * device.
3383          */
3384         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3385         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3386         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3387
3388         /*
3389          * Compute the number of bytes from that offset to the end
3390          * of the object.  Account for what's already used by the bio.
3391          */
3392         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3393         if (ret > bmd->bi_size)
3394                 ret -= bmd->bi_size;
3395         else
3396                 ret = 0;
3397
3398         /*
3399          * Don't send back more than was asked for.  And if the bio
3400          * was empty, let the whole thing through because:  "Note
3401          * that a block device *must* allow a single page to be
3402          * added to an empty bio."
3403          */
3404         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3405         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3406                 ret = (int) bvec->bv_len;
3407
3408         return ret;
3409 }
3410
3411 static void rbd_free_disk(struct rbd_device *rbd_dev)
3412 {
3413         struct gendisk *disk = rbd_dev->disk;
3414
3415         if (!disk)
3416                 return;
3417
3418         rbd_dev->disk = NULL;
3419         if (disk->flags & GENHD_FL_UP) {
3420                 del_gendisk(disk);
3421                 if (disk->queue)
3422                         blk_cleanup_queue(disk->queue);
3423         }
3424         put_disk(disk);
3425 }
3426
3427 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3428                                 const char *object_name,
3429                                 u64 offset, u64 length, void *buf)
3430
3431 {
3432         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3433         struct rbd_obj_request *obj_request;
3434         struct page **pages = NULL;
3435         u32 page_count;
3436         size_t size;
3437         int ret;
3438
3439         page_count = (u32) calc_pages_for(offset, length);
3440         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3441         if (IS_ERR(pages))
3442                 ret = PTR_ERR(pages);
3443
3444         ret = -ENOMEM;
3445         obj_request = rbd_obj_request_create(object_name, offset, length,
3446                                                         OBJ_REQUEST_PAGES);
3447         if (!obj_request)
3448                 goto out;
3449
3450         obj_request->pages = pages;
3451         obj_request->page_count = page_count;
3452
3453         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3454                                                   obj_request);
3455         if (!obj_request->osd_req)
3456                 goto out;
3457
3458         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3459                                         offset, length, 0, 0);
3460         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3461                                         obj_request->pages,
3462                                         obj_request->length,
3463                                         obj_request->offset & ~PAGE_MASK,
3464                                         false, false);
3465         rbd_osd_req_format_read(obj_request);
3466
3467         ret = rbd_obj_request_submit(osdc, obj_request);
3468         if (ret)
3469                 goto out;
3470         ret = rbd_obj_request_wait(obj_request);
3471         if (ret)
3472                 goto out;
3473
3474         ret = obj_request->result;
3475         if (ret < 0)
3476                 goto out;
3477
3478         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3479         size = (size_t) obj_request->xferred;
3480         ceph_copy_from_page_vector(pages, buf, 0, size);
3481         rbd_assert(size <= (size_t)INT_MAX);
3482         ret = (int)size;
3483 out:
3484         if (obj_request)
3485                 rbd_obj_request_put(obj_request);
3486         else
3487                 ceph_release_page_vector(pages, page_count);
3488
3489         return ret;
3490 }
3491
3492 /*
3493  * Read the complete header for the given rbd device.  On successful
3494  * return, the rbd_dev->header field will contain up-to-date
3495  * information about the image.
3496  */
3497 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3498 {
3499         struct rbd_image_header_ondisk *ondisk = NULL;
3500         u32 snap_count = 0;
3501         u64 names_size = 0;
3502         u32 want_count;
3503         int ret;
3504
3505         /*
3506          * The complete header will include an array of its 64-bit
3507          * snapshot ids, followed by the names of those snapshots as
3508          * a contiguous block of NUL-terminated strings.  Note that
3509          * the number of snapshots could change by the time we read
3510          * it in, in which case we re-read it.
3511          */
3512         do {
3513                 size_t size;
3514
3515                 kfree(ondisk);
3516
3517                 size = sizeof (*ondisk);
3518                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3519                 size += names_size;
3520                 ondisk = kmalloc(size, GFP_KERNEL);
3521                 if (!ondisk)
3522                         return -ENOMEM;
3523
3524                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3525                                        0, size, ondisk);
3526                 if (ret < 0)
3527                         goto out;
3528                 if ((size_t)ret < size) {
3529                         ret = -ENXIO;
3530                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3531                                 size, ret);
3532                         goto out;
3533                 }
3534                 if (!rbd_dev_ondisk_valid(ondisk)) {
3535                         ret = -ENXIO;
3536                         rbd_warn(rbd_dev, "invalid header");
3537                         goto out;
3538                 }
3539
3540                 names_size = le64_to_cpu(ondisk->snap_names_len);
3541                 want_count = snap_count;
3542                 snap_count = le32_to_cpu(ondisk->snap_count);
3543         } while (snap_count != want_count);
3544
3545         ret = rbd_header_from_disk(rbd_dev, ondisk);
3546 out:
3547         kfree(ondisk);
3548
3549         return ret;
3550 }
3551
3552 /*
3553  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3554  * has disappeared from the (just updated) snapshot context.
3555  */
3556 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3557 {
3558         u64 snap_id;
3559
3560         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3561                 return;
3562
3563         snap_id = rbd_dev->spec->snap_id;
3564         if (snap_id == CEPH_NOSNAP)
3565                 return;
3566
3567         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3568                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3569 }
3570
3571 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3572 {
3573         sector_t size;
3574         bool removing;
3575
3576         /*
3577          * Don't hold the lock while doing disk operations,
3578          * or lock ordering will conflict with the bdev mutex via:
3579          * rbd_add() -> blkdev_get() -> rbd_open()
3580          */
3581         spin_lock_irq(&rbd_dev->lock);
3582         removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3583         spin_unlock_irq(&rbd_dev->lock);
3584         /*
3585          * If the device is being removed, rbd_dev->disk has
3586          * been destroyed, so don't try to update its size
3587          */
3588         if (!removing) {
3589                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3590                 dout("setting size to %llu sectors", (unsigned long long)size);
3591                 set_capacity(rbd_dev->disk, size);
3592                 revalidate_disk(rbd_dev->disk);
3593         }
3594 }
3595
3596 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3597 {
3598         u64 mapping_size;
3599         int ret;
3600
3601         down_write(&rbd_dev->header_rwsem);
3602         mapping_size = rbd_dev->mapping.size;
3603
3604         ret = rbd_dev_header_info(rbd_dev);
3605         if (ret)
3606                 return ret;
3607
3608         /*
3609          * If there is a parent, see if it has disappeared due to the
3610          * mapped image getting flattened.
3611          */
3612         if (rbd_dev->parent) {
3613                 ret = rbd_dev_v2_parent_info(rbd_dev);
3614                 if (ret)
3615                         return ret;
3616         }
3617
3618         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3619                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3620                         rbd_dev->mapping.size = rbd_dev->header.image_size;
3621         } else {
3622                 /* validate mapped snapshot's EXISTS flag */
3623                 rbd_exists_validate(rbd_dev);
3624         }
3625
3626         up_write(&rbd_dev->header_rwsem);
3627
3628         if (mapping_size != rbd_dev->mapping.size)
3629                 rbd_dev_update_size(rbd_dev);
3630
3631         return 0;
3632 }
3633
3634 static int rbd_init_disk(struct rbd_device *rbd_dev)
3635 {
3636         struct gendisk *disk;
3637         struct request_queue *q;
3638         u64 segment_size;
3639
3640         /* create gendisk info */
3641         disk = alloc_disk(single_major ?
3642                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3643                           RBD_MINORS_PER_MAJOR);
3644         if (!disk)
3645                 return -ENOMEM;
3646
3647         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3648                  rbd_dev->dev_id);
3649         disk->major = rbd_dev->major;
3650         disk->first_minor = rbd_dev->minor;
3651         if (single_major)
3652                 disk->flags |= GENHD_FL_EXT_DEVT;
3653         disk->fops = &rbd_bd_ops;
3654         disk->private_data = rbd_dev;
3655
3656         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3657         if (!q)
3658                 goto out_disk;
3659
3660         /* We use the default size, but let's be explicit about it. */
3661         blk_queue_physical_block_size(q, SECTOR_SIZE);
3662
3663         /* set io sizes to object size */
3664         segment_size = rbd_obj_bytes(&rbd_dev->header);
3665         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3666         blk_queue_max_segment_size(q, segment_size);
3667         blk_queue_io_min(q, segment_size);
3668         blk_queue_io_opt(q, segment_size);
3669
3670         blk_queue_merge_bvec(q, rbd_merge_bvec);
3671         disk->queue = q;
3672
3673         q->queuedata = rbd_dev;
3674
3675         rbd_dev->disk = disk;
3676
3677         return 0;
3678 out_disk:
3679         put_disk(disk);
3680
3681         return -ENOMEM;
3682 }
3683
3684 /*
3685   sysfs
3686 */
3687
3688 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3689 {
3690         return container_of(dev, struct rbd_device, dev);
3691 }
3692
3693 static ssize_t rbd_size_show(struct device *dev,
3694                              struct device_attribute *attr, char *buf)
3695 {
3696         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3697
3698         return sprintf(buf, "%llu\n",
3699                 (unsigned long long)rbd_dev->mapping.size);
3700 }
3701
3702 /*
3703  * Note this shows the features for whatever's mapped, which is not
3704  * necessarily the base image.
3705  */
3706 static ssize_t rbd_features_show(struct device *dev,
3707                              struct device_attribute *attr, char *buf)
3708 {
3709         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3710
3711         return sprintf(buf, "0x%016llx\n",
3712                         (unsigned long long)rbd_dev->mapping.features);
3713 }
3714
3715 static ssize_t rbd_major_show(struct device *dev,
3716                               struct device_attribute *attr, char *buf)
3717 {
3718         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3719
3720         if (rbd_dev->major)
3721                 return sprintf(buf, "%d\n", rbd_dev->major);
3722
3723         return sprintf(buf, "(none)\n");
3724 }
3725
3726 static ssize_t rbd_minor_show(struct device *dev,
3727                               struct device_attribute *attr, char *buf)
3728 {
3729         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3730
3731         return sprintf(buf, "%d\n", rbd_dev->minor);
3732 }
3733
3734 static ssize_t rbd_client_id_show(struct device *dev,
3735                                   struct device_attribute *attr, char *buf)
3736 {
3737         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3738
3739         return sprintf(buf, "client%lld\n",
3740                         ceph_client_id(rbd_dev->rbd_client->client));
3741 }
3742
3743 static ssize_t rbd_pool_show(struct device *dev,
3744                              struct device_attribute *attr, char *buf)
3745 {
3746         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3747
3748         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3749 }
3750
3751 static ssize_t rbd_pool_id_show(struct device *dev,
3752                              struct device_attribute *attr, char *buf)
3753 {
3754         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3755
3756         return sprintf(buf, "%llu\n",
3757                         (unsigned long long) rbd_dev->spec->pool_id);
3758 }
3759
3760 static ssize_t rbd_name_show(struct device *dev,
3761                              struct device_attribute *attr, char *buf)
3762 {
3763         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3764
3765         if (rbd_dev->spec->image_name)
3766                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3767
3768         return sprintf(buf, "(unknown)\n");
3769 }
3770
3771 static ssize_t rbd_image_id_show(struct device *dev,
3772                              struct device_attribute *attr, char *buf)
3773 {
3774         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3775
3776         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3777 }
3778
3779 /*
3780  * Shows the name of the currently-mapped snapshot (or
3781  * RBD_SNAP_HEAD_NAME for the base image).
3782  */
3783 static ssize_t rbd_snap_show(struct device *dev,
3784                              struct device_attribute *attr,
3785                              char *buf)
3786 {
3787         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3788
3789         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3790 }
3791
3792 /*
3793  * For a v2 image, shows the chain of parent images, separated by empty
3794  * lines.  For v1 images or if there is no parent, shows "(no parent
3795  * image)".
3796  */
3797 static ssize_t rbd_parent_show(struct device *dev,
3798                                struct device_attribute *attr,
3799                                char *buf)
3800 {
3801         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3802         ssize_t count = 0;
3803
3804         if (!rbd_dev->parent)
3805                 return sprintf(buf, "(no parent image)\n");
3806
3807         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3808                 struct rbd_spec *spec = rbd_dev->parent_spec;
3809
3810                 count += sprintf(&buf[count], "%s"
3811                             "pool_id %llu\npool_name %s\n"
3812                             "image_id %s\nimage_name %s\n"
3813                             "snap_id %llu\nsnap_name %s\n"
3814                             "overlap %llu\n",
3815                             !count ? "" : "\n", /* first? */
3816                             spec->pool_id, spec->pool_name,
3817                             spec->image_id, spec->image_name ?: "(unknown)",
3818                             spec->snap_id, spec->snap_name,
3819                             rbd_dev->parent_overlap);
3820         }
3821
3822         return count;
3823 }
3824
3825 static ssize_t rbd_image_refresh(struct device *dev,
3826                                  struct device_attribute *attr,
3827                                  const char *buf,
3828                                  size_t size)
3829 {
3830         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3831         int ret;
3832
3833         ret = rbd_dev_refresh(rbd_dev);
3834         if (ret)
3835                 return ret;
3836
3837         return size;
3838 }
3839
3840 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3841 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3842 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3843 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3844 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3845 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3846 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3847 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3848 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3849 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3850 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3851 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3852
3853 static struct attribute *rbd_attrs[] = {
3854         &dev_attr_size.attr,
3855         &dev_attr_features.attr,
3856         &dev_attr_major.attr,
3857         &dev_attr_minor.attr,
3858         &dev_attr_client_id.attr,
3859         &dev_attr_pool.attr,
3860         &dev_attr_pool_id.attr,
3861         &dev_attr_name.attr,
3862         &dev_attr_image_id.attr,
3863         &dev_attr_current_snap.attr,
3864         &dev_attr_parent.attr,
3865         &dev_attr_refresh.attr,
3866         NULL
3867 };
3868
3869 static struct attribute_group rbd_attr_group = {
3870         .attrs = rbd_attrs,
3871 };
3872
3873 static const struct attribute_group *rbd_attr_groups[] = {
3874         &rbd_attr_group,
3875         NULL
3876 };
3877
3878 static void rbd_sysfs_dev_release(struct device *dev)
3879 {
3880 }
3881
3882 static struct device_type rbd_device_type = {
3883         .name           = "rbd",
3884         .groups         = rbd_attr_groups,
3885         .release        = rbd_sysfs_dev_release,
3886 };
3887
3888 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3889 {
3890         kref_get(&spec->kref);
3891
3892         return spec;
3893 }
3894
3895 static void rbd_spec_free(struct kref *kref);
3896 static void rbd_spec_put(struct rbd_spec *spec)
3897 {
3898         if (spec)
3899                 kref_put(&spec->kref, rbd_spec_free);
3900 }
3901
3902 static struct rbd_spec *rbd_spec_alloc(void)
3903 {
3904         struct rbd_spec *spec;
3905
3906         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3907         if (!spec)
3908                 return NULL;
3909
3910         spec->pool_id = CEPH_NOPOOL;
3911         spec->snap_id = CEPH_NOSNAP;
3912         kref_init(&spec->kref);
3913
3914         return spec;
3915 }
3916
3917 static void rbd_spec_free(struct kref *kref)
3918 {
3919         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3920
3921         kfree(spec->pool_name);
3922         kfree(spec->image_id);
3923         kfree(spec->image_name);
3924         kfree(spec->snap_name);
3925         kfree(spec);
3926 }
3927
3928 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3929                                 struct rbd_spec *spec)
3930 {
3931         struct rbd_device *rbd_dev;
3932
3933         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3934         if (!rbd_dev)
3935                 return NULL;
3936
3937         spin_lock_init(&rbd_dev->lock);
3938         INIT_LIST_HEAD(&rbd_dev->rq_queue);
3939         INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
3940         rbd_dev->flags = 0;
3941         atomic_set(&rbd_dev->parent_ref, 0);
3942         INIT_LIST_HEAD(&rbd_dev->node);
3943         init_rwsem(&rbd_dev->header_rwsem);
3944
3945         rbd_dev->spec = spec;
3946         rbd_dev->rbd_client = rbdc;
3947
3948         /* Initialize the layout used for all rbd requests */
3949
3950         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3951         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3952         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3953         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3954
3955         return rbd_dev;
3956 }
3957
3958 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3959 {
3960         rbd_put_client(rbd_dev->rbd_client);
3961         rbd_spec_put(rbd_dev->spec);
3962         kfree(rbd_dev);
3963 }
3964
3965 /*
3966  * Get the size and object order for an image snapshot, or if
3967  * snap_id is CEPH_NOSNAP, gets this information for the base
3968  * image.
3969  */
3970 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3971                                 u8 *order, u64 *snap_size)
3972 {
3973         __le64 snapid = cpu_to_le64(snap_id);
3974         int ret;
3975         struct {
3976                 u8 order;
3977                 __le64 size;
3978         } __attribute__ ((packed)) size_buf = { 0 };
3979
3980         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3981                                 "rbd", "get_size",
3982                                 &snapid, sizeof (snapid),
3983                                 &size_buf, sizeof (size_buf));
3984         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3985         if (ret < 0)
3986                 return ret;
3987         if (ret < sizeof (size_buf))
3988                 return -ERANGE;
3989
3990         if (order) {
3991                 *order = size_buf.order;
3992                 dout("  order %u", (unsigned int)*order);
3993         }
3994         *snap_size = le64_to_cpu(size_buf.size);
3995
3996         dout("  snap_id 0x%016llx snap_size = %llu\n",
3997                 (unsigned long long)snap_id,
3998                 (unsigned long long)*snap_size);
3999
4000         return 0;
4001 }
4002
4003 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4004 {
4005         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4006                                         &rbd_dev->header.obj_order,
4007                                         &rbd_dev->header.image_size);
4008 }
4009
4010 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4011 {
4012         void *reply_buf;
4013         int ret;
4014         void *p;
4015
4016         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4017         if (!reply_buf)
4018                 return -ENOMEM;
4019
4020         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4021                                 "rbd", "get_object_prefix", NULL, 0,
4022                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4023         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4024         if (ret < 0)
4025                 goto out;
4026
4027         p = reply_buf;
4028         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4029                                                 p + ret, NULL, GFP_NOIO);
4030         ret = 0;
4031
4032         if (IS_ERR(rbd_dev->header.object_prefix)) {
4033                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4034                 rbd_dev->header.object_prefix = NULL;
4035         } else {
4036                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4037         }
4038 out:
4039         kfree(reply_buf);
4040
4041         return ret;
4042 }
4043
4044 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4045                 u64 *snap_features)
4046 {
4047         __le64 snapid = cpu_to_le64(snap_id);
4048         struct {
4049                 __le64 features;
4050                 __le64 incompat;
4051         } __attribute__ ((packed)) features_buf = { 0 };
4052         u64 incompat;
4053         int ret;
4054
4055         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4056                                 "rbd", "get_features",
4057                                 &snapid, sizeof (snapid),
4058                                 &features_buf, sizeof (features_buf));
4059         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4060         if (ret < 0)
4061                 return ret;
4062         if (ret < sizeof (features_buf))
4063                 return -ERANGE;
4064
4065         incompat = le64_to_cpu(features_buf.incompat);
4066         if (incompat & ~RBD_FEATURES_SUPPORTED)
4067                 return -ENXIO;
4068
4069         *snap_features = le64_to_cpu(features_buf.features);
4070
4071         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4072                 (unsigned long long)snap_id,
4073                 (unsigned long long)*snap_features,
4074                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4075
4076         return 0;
4077 }
4078
4079 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4080 {
4081         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4082                                                 &rbd_dev->header.features);
4083 }
4084
4085 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4086 {
4087         struct rbd_spec *parent_spec;
4088         size_t size;
4089         void *reply_buf = NULL;
4090         __le64 snapid;
4091         void *p;
4092         void *end;
4093         u64 pool_id;
4094         char *image_id;
4095         u64 snap_id;
4096         u64 overlap;
4097         int ret;
4098
4099         parent_spec = rbd_spec_alloc();
4100         if (!parent_spec)
4101                 return -ENOMEM;
4102
4103         size = sizeof (__le64) +                                /* pool_id */
4104                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
4105                 sizeof (__le64) +                               /* snap_id */
4106                 sizeof (__le64);                                /* overlap */
4107         reply_buf = kmalloc(size, GFP_KERNEL);
4108         if (!reply_buf) {
4109                 ret = -ENOMEM;
4110                 goto out_err;
4111         }
4112
4113         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4114         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4115                                 "rbd", "get_parent",
4116                                 &snapid, sizeof (snapid),
4117                                 reply_buf, size);
4118         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4119         if (ret < 0)
4120                 goto out_err;
4121
4122         p = reply_buf;
4123         end = reply_buf + ret;
4124         ret = -ERANGE;
4125         ceph_decode_64_safe(&p, end, pool_id, out_err);
4126         if (pool_id == CEPH_NOPOOL) {
4127                 /*
4128                  * Either the parent never existed, or we have
4129                  * record of it but the image got flattened so it no
4130                  * longer has a parent.  When the parent of a
4131                  * layered image disappears we immediately set the
4132                  * overlap to 0.  The effect of this is that all new
4133                  * requests will be treated as if the image had no
4134                  * parent.
4135                  */
4136                 if (rbd_dev->parent_overlap) {
4137                         rbd_dev->parent_overlap = 0;
4138                         smp_mb();
4139                         rbd_dev_parent_put(rbd_dev);
4140                         pr_info("%s: clone image has been flattened\n",
4141                                 rbd_dev->disk->disk_name);
4142                 }
4143
4144                 goto out;       /* No parent?  No problem. */
4145         }
4146
4147         /* The ceph file layout needs to fit pool id in 32 bits */
4148
4149         ret = -EIO;
4150         if (pool_id > (u64)U32_MAX) {
4151                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4152                         (unsigned long long)pool_id, U32_MAX);
4153                 goto out_err;
4154         }
4155
4156         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4157         if (IS_ERR(image_id)) {
4158                 ret = PTR_ERR(image_id);
4159                 goto out_err;
4160         }
4161         ceph_decode_64_safe(&p, end, snap_id, out_err);
4162         ceph_decode_64_safe(&p, end, overlap, out_err);
4163
4164         /*
4165          * The parent won't change (except when the clone is
4166          * flattened, already handled that).  So we only need to
4167          * record the parent spec we have not already done so.
4168          */
4169         if (!rbd_dev->parent_spec) {
4170                 parent_spec->pool_id = pool_id;
4171                 parent_spec->image_id = image_id;
4172                 parent_spec->snap_id = snap_id;
4173                 rbd_dev->parent_spec = parent_spec;
4174                 parent_spec = NULL;     /* rbd_dev now owns this */
4175         } else {
4176                 kfree(image_id);
4177         }
4178
4179         /*
4180          * We always update the parent overlap.  If it's zero we
4181          * treat it specially.
4182          */
4183         rbd_dev->parent_overlap = overlap;
4184         smp_mb();
4185         if (!overlap) {
4186
4187                 /* A null parent_spec indicates it's the initial probe */
4188
4189                 if (parent_spec) {
4190                         /*
4191                          * The overlap has become zero, so the clone
4192                          * must have been resized down to 0 at some
4193                          * point.  Treat this the same as a flatten.
4194                          */
4195                         rbd_dev_parent_put(rbd_dev);
4196                         pr_info("%s: clone image now standalone\n",
4197                                 rbd_dev->disk->disk_name);
4198                 } else {
4199                         /*
4200                          * For the initial probe, if we find the
4201                          * overlap is zero we just pretend there was
4202                          * no parent image.
4203                          */
4204                         rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4205                 }
4206         }
4207 out:
4208         ret = 0;
4209 out_err:
4210         kfree(reply_buf);
4211         rbd_spec_put(parent_spec);
4212
4213         return ret;
4214 }
4215
4216 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4217 {
4218         struct {
4219                 __le64 stripe_unit;
4220                 __le64 stripe_count;
4221         } __attribute__ ((packed)) striping_info_buf = { 0 };
4222         size_t size = sizeof (striping_info_buf);
4223         void *p;
4224         u64 obj_size;
4225         u64 stripe_unit;
4226         u64 stripe_count;
4227         int ret;
4228
4229         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4230                                 "rbd", "get_stripe_unit_count", NULL, 0,
4231                                 (char *)&striping_info_buf, size);
4232         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4233         if (ret < 0)
4234                 return ret;
4235         if (ret < size)
4236                 return -ERANGE;
4237
4238         /*
4239          * We don't actually support the "fancy striping" feature
4240          * (STRIPINGV2) yet, but if the striping sizes are the
4241          * defaults the behavior is the same as before.  So find
4242          * out, and only fail if the image has non-default values.
4243          */
4244         ret = -EINVAL;
4245         obj_size = (u64)1 << rbd_dev->header.obj_order;
4246         p = &striping_info_buf;
4247         stripe_unit = ceph_decode_64(&p);
4248         if (stripe_unit != obj_size) {
4249                 rbd_warn(rbd_dev, "unsupported stripe unit "
4250                                 "(got %llu want %llu)",
4251                                 stripe_unit, obj_size);
4252                 return -EINVAL;
4253         }
4254         stripe_count = ceph_decode_64(&p);
4255         if (stripe_count != 1) {
4256                 rbd_warn(rbd_dev, "unsupported stripe count "
4257                                 "(got %llu want 1)", stripe_count);
4258                 return -EINVAL;
4259         }
4260         rbd_dev->header.stripe_unit = stripe_unit;
4261         rbd_dev->header.stripe_count = stripe_count;
4262
4263         return 0;
4264 }
4265
4266 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4267 {
4268         size_t image_id_size;
4269         char *image_id;
4270         void *p;
4271         void *end;
4272         size_t size;
4273         void *reply_buf = NULL;
4274         size_t len = 0;
4275         char *image_name = NULL;
4276         int ret;
4277
4278         rbd_assert(!rbd_dev->spec->image_name);
4279
4280         len = strlen(rbd_dev->spec->image_id);
4281         image_id_size = sizeof (__le32) + len;
4282         image_id = kmalloc(image_id_size, GFP_KERNEL);
4283         if (!image_id)
4284                 return NULL;
4285
4286         p = image_id;
4287         end = image_id + image_id_size;
4288         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4289
4290         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4291         reply_buf = kmalloc(size, GFP_KERNEL);
4292         if (!reply_buf)
4293                 goto out;
4294
4295         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4296                                 "rbd", "dir_get_name",
4297                                 image_id, image_id_size,
4298                                 reply_buf, size);
4299         if (ret < 0)
4300                 goto out;
4301         p = reply_buf;
4302         end = reply_buf + ret;
4303
4304         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4305         if (IS_ERR(image_name))
4306                 image_name = NULL;
4307         else
4308                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4309 out:
4310         kfree(reply_buf);
4311         kfree(image_id);
4312
4313         return image_name;
4314 }
4315
4316 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4317 {
4318         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4319         const char *snap_name;
4320         u32 which = 0;
4321
4322         /* Skip over names until we find the one we are looking for */
4323
4324         snap_name = rbd_dev->header.snap_names;
4325         while (which < snapc->num_snaps) {
4326                 if (!strcmp(name, snap_name))
4327                         return snapc->snaps[which];
4328                 snap_name += strlen(snap_name) + 1;
4329                 which++;
4330         }
4331         return CEPH_NOSNAP;
4332 }
4333
4334 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4335 {
4336         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4337         u32 which;
4338         bool found = false;
4339         u64 snap_id;
4340
4341         for (which = 0; !found && which < snapc->num_snaps; which++) {
4342                 const char *snap_name;
4343
4344                 snap_id = snapc->snaps[which];
4345                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4346                 if (IS_ERR(snap_name)) {
4347                         /* ignore no-longer existing snapshots */
4348                         if (PTR_ERR(snap_name) == -ENOENT)
4349                                 continue;
4350                         else
4351                                 break;
4352                 }
4353                 found = !strcmp(name, snap_name);
4354                 kfree(snap_name);
4355         }
4356         return found ? snap_id : CEPH_NOSNAP;
4357 }
4358
4359 /*
4360  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4361  * no snapshot by that name is found, or if an error occurs.
4362  */
4363 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4364 {
4365         if (rbd_dev->image_format == 1)
4366                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4367
4368         return rbd_v2_snap_id_by_name(rbd_dev, name);
4369 }
4370
4371 /*
4372  * An image being mapped will have everything but the snap id.
4373  */
4374 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4375 {
4376         struct rbd_spec *spec = rbd_dev->spec;
4377
4378         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4379         rbd_assert(spec->image_id && spec->image_name);
4380         rbd_assert(spec->snap_name);
4381
4382         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4383                 u64 snap_id;
4384
4385                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4386                 if (snap_id == CEPH_NOSNAP)
4387                         return -ENOENT;
4388
4389                 spec->snap_id = snap_id;
4390         } else {
4391                 spec->snap_id = CEPH_NOSNAP;
4392         }
4393
4394         return 0;
4395 }
4396
4397 /*
4398  * A parent image will have all ids but none of the names.
4399  *
4400  * All names in an rbd spec are dynamically allocated.  It's OK if we
4401  * can't figure out the name for an image id.
4402  */
4403 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4404 {
4405         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4406         struct rbd_spec *spec = rbd_dev->spec;
4407         const char *pool_name;
4408         const char *image_name;
4409         const char *snap_name;
4410         int ret;
4411
4412         rbd_assert(spec->pool_id != CEPH_NOPOOL);
4413         rbd_assert(spec->image_id);
4414         rbd_assert(spec->snap_id != CEPH_NOSNAP);
4415
4416         /* Get the pool name; we have to make our own copy of this */
4417
4418         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4419         if (!pool_name) {
4420                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4421                 return -EIO;
4422         }
4423         pool_name = kstrdup(pool_name, GFP_KERNEL);
4424         if (!pool_name)
4425                 return -ENOMEM;
4426
4427         /* Fetch the image name; tolerate failure here */
4428
4429         image_name = rbd_dev_image_name(rbd_dev);
4430         if (!image_name)
4431                 rbd_warn(rbd_dev, "unable to get image name");
4432
4433         /* Fetch the snapshot name */
4434
4435         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4436         if (IS_ERR(snap_name)) {
4437                 ret = PTR_ERR(snap_name);
4438                 goto out_err;
4439         }
4440
4441         spec->pool_name = pool_name;
4442         spec->image_name = image_name;
4443         spec->snap_name = snap_name;
4444
4445         return 0;
4446
4447 out_err:
4448         kfree(image_name);
4449         kfree(pool_name);
4450         return ret;
4451 }
4452
4453 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4454 {
4455         size_t size;
4456         int ret;
4457         void *reply_buf;
4458         void *p;
4459         void *end;
4460         u64 seq;
4461         u32 snap_count;
4462         struct ceph_snap_context *snapc;
4463         u32 i;
4464
4465         /*
4466          * We'll need room for the seq value (maximum snapshot id),
4467          * snapshot count, and array of that many snapshot ids.
4468          * For now we have a fixed upper limit on the number we're
4469          * prepared to receive.
4470          */
4471         size = sizeof (__le64) + sizeof (__le32) +
4472                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
4473         reply_buf = kzalloc(size, GFP_KERNEL);
4474         if (!reply_buf)
4475                 return -ENOMEM;
4476
4477         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4478                                 "rbd", "get_snapcontext", NULL, 0,
4479                                 reply_buf, size);
4480         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4481         if (ret < 0)
4482                 goto out;
4483
4484         p = reply_buf;
4485         end = reply_buf + ret;
4486         ret = -ERANGE;
4487         ceph_decode_64_safe(&p, end, seq, out);
4488         ceph_decode_32_safe(&p, end, snap_count, out);
4489
4490         /*
4491          * Make sure the reported number of snapshot ids wouldn't go
4492          * beyond the end of our buffer.  But before checking that,
4493          * make sure the computed size of the snapshot context we
4494          * allocate is representable in a size_t.
4495          */
4496         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4497                                  / sizeof (u64)) {
4498                 ret = -EINVAL;
4499                 goto out;
4500         }
4501         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4502                 goto out;
4503         ret = 0;
4504
4505         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4506         if (!snapc) {
4507                 ret = -ENOMEM;
4508                 goto out;
4509         }
4510         snapc->seq = seq;
4511         for (i = 0; i < snap_count; i++)
4512                 snapc->snaps[i] = ceph_decode_64(&p);
4513
4514         ceph_put_snap_context(rbd_dev->header.snapc);
4515         rbd_dev->header.snapc = snapc;
4516
4517         dout("  snap context seq = %llu, snap_count = %u\n",
4518                 (unsigned long long)seq, (unsigned int)snap_count);
4519 out:
4520         kfree(reply_buf);
4521
4522         return ret;
4523 }
4524
4525 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4526                                         u64 snap_id)
4527 {
4528         size_t size;
4529         void *reply_buf;
4530         __le64 snapid;
4531         int ret;
4532         void *p;
4533         void *end;
4534         char *snap_name;
4535
4536         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4537         reply_buf = kmalloc(size, GFP_KERNEL);
4538         if (!reply_buf)
4539                 return ERR_PTR(-ENOMEM);
4540
4541         snapid = cpu_to_le64(snap_id);
4542         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4543                                 "rbd", "get_snapshot_name",
4544                                 &snapid, sizeof (snapid),
4545                                 reply_buf, size);
4546         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4547         if (ret < 0) {
4548                 snap_name = ERR_PTR(ret);
4549                 goto out;
4550         }
4551
4552         p = reply_buf;
4553         end = reply_buf + ret;
4554         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4555         if (IS_ERR(snap_name))
4556                 goto out;
4557
4558         dout("  snap_id 0x%016llx snap_name = %s\n",
4559                 (unsigned long long)snap_id, snap_name);
4560 out:
4561         kfree(reply_buf);
4562
4563         return snap_name;
4564 }
4565
4566 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4567 {
4568         bool first_time = rbd_dev->header.object_prefix == NULL;
4569         int ret;
4570
4571         ret = rbd_dev_v2_image_size(rbd_dev);
4572         if (ret)
4573                 return ret;
4574
4575         if (first_time) {
4576                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4577                 if (ret)
4578                         return ret;
4579         }
4580
4581         ret = rbd_dev_v2_snap_context(rbd_dev);
4582         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4583
4584         return ret;
4585 }
4586
4587 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4588 {
4589         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4590
4591         if (rbd_dev->image_format == 1)
4592                 return rbd_dev_v1_header_info(rbd_dev);
4593
4594         return rbd_dev_v2_header_info(rbd_dev);
4595 }
4596
4597 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4598 {
4599         struct device *dev;
4600         int ret;
4601
4602         dev = &rbd_dev->dev;
4603         dev->bus = &rbd_bus_type;
4604         dev->type = &rbd_device_type;
4605         dev->parent = &rbd_root_dev;
4606         dev->release = rbd_dev_device_release;
4607         dev_set_name(dev, "%d", rbd_dev->dev_id);
4608         ret = device_register(dev);
4609
4610         return ret;
4611 }
4612
4613 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4614 {
4615         device_unregister(&rbd_dev->dev);
4616 }
4617
4618 /*
4619  * Get a unique rbd identifier for the given new rbd_dev, and add
4620  * the rbd_dev to the global list.
4621  */
4622 static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4623 {
4624         int new_dev_id;
4625
4626         new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4627                                     0, minor_to_rbd_dev_id(1 << MINORBITS),
4628                                     GFP_KERNEL);
4629         if (new_dev_id < 0)
4630                 return new_dev_id;
4631
4632         rbd_dev->dev_id = new_dev_id;
4633
4634         spin_lock(&rbd_dev_list_lock);
4635         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4636         spin_unlock(&rbd_dev_list_lock);
4637
4638         dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4639
4640         return 0;
4641 }
4642
4643 /*
4644  * Remove an rbd_dev from the global list, and record that its
4645  * identifier is no longer in use.
4646  */
4647 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4648 {
4649         spin_lock(&rbd_dev_list_lock);
4650         list_del_init(&rbd_dev->node);
4651         spin_unlock(&rbd_dev_list_lock);
4652
4653         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4654
4655         dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4656 }
4657
4658 /*
4659  * Skips over white space at *buf, and updates *buf to point to the
4660  * first found non-space character (if any). Returns the length of
4661  * the token (string of non-white space characters) found.  Note
4662  * that *buf must be terminated with '\0'.
4663  */
4664 static inline size_t next_token(const char **buf)
4665 {
4666         /*
4667         * These are the characters that produce nonzero for
4668         * isspace() in the "C" and "POSIX" locales.
4669         */
4670         const char *spaces = " \f\n\r\t\v";
4671
4672         *buf += strspn(*buf, spaces);   /* Find start of token */
4673
4674         return strcspn(*buf, spaces);   /* Return token length */
4675 }
4676
4677 /*
4678  * Finds the next token in *buf, and if the provided token buffer is
4679  * big enough, copies the found token into it.  The result, if
4680  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4681  * must be terminated with '\0' on entry.
4682  *
4683  * Returns the length of the token found (not including the '\0').
4684  * Return value will be 0 if no token is found, and it will be >=
4685  * token_size if the token would not fit.
4686  *
4687  * The *buf pointer will be updated to point beyond the end of the
4688  * found token.  Note that this occurs even if the token buffer is
4689  * too small to hold it.
4690  */
4691 static inline size_t copy_token(const char **buf,
4692                                 char *token,
4693                                 size_t token_size)
4694 {
4695         size_t len;
4696
4697         len = next_token(buf);
4698         if (len < token_size) {
4699                 memcpy(token, *buf, len);
4700                 *(token + len) = '\0';
4701         }
4702         *buf += len;
4703
4704         return len;
4705 }
4706
4707 /*
4708  * Finds the next token in *buf, dynamically allocates a buffer big
4709  * enough to hold a copy of it, and copies the token into the new
4710  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4711  * that a duplicate buffer is created even for a zero-length token.
4712  *
4713  * Returns a pointer to the newly-allocated duplicate, or a null
4714  * pointer if memory for the duplicate was not available.  If
4715  * the lenp argument is a non-null pointer, the length of the token
4716  * (not including the '\0') is returned in *lenp.
4717  *
4718  * If successful, the *buf pointer will be updated to point beyond
4719  * the end of the found token.
4720  *
4721  * Note: uses GFP_KERNEL for allocation.
4722  */
4723 static inline char *dup_token(const char **buf, size_t *lenp)
4724 {
4725         char *dup;
4726         size_t len;
4727
4728         len = next_token(buf);
4729         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4730         if (!dup)
4731                 return NULL;
4732         *(dup + len) = '\0';
4733         *buf += len;
4734
4735         if (lenp)
4736                 *lenp = len;
4737
4738         return dup;
4739 }
4740
4741 /*
4742  * Parse the options provided for an "rbd add" (i.e., rbd image
4743  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4744  * and the data written is passed here via a NUL-terminated buffer.
4745  * Returns 0 if successful or an error code otherwise.
4746  *
4747  * The information extracted from these options is recorded in
4748  * the other parameters which return dynamically-allocated
4749  * structures:
4750  *  ceph_opts
4751  *      The address of a pointer that will refer to a ceph options
4752  *      structure.  Caller must release the returned pointer using
4753  *      ceph_destroy_options() when it is no longer needed.
4754  *  rbd_opts
4755  *      Address of an rbd options pointer.  Fully initialized by
4756  *      this function; caller must release with kfree().
4757  *  spec
4758  *      Address of an rbd image specification pointer.  Fully
4759  *      initialized by this function based on parsed options.
4760  *      Caller must release with rbd_spec_put().
4761  *
4762  * The options passed take this form:
4763  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4764  * where:
4765  *  <mon_addrs>
4766  *      A comma-separated list of one or more monitor addresses.
4767  *      A monitor address is an ip address, optionally followed
4768  *      by a port number (separated by a colon).
4769  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4770  *  <options>
4771  *      A comma-separated list of ceph and/or rbd options.
4772  *  <pool_name>
4773  *      The name of the rados pool containing the rbd image.
4774  *  <image_name>
4775  *      The name of the image in that pool to map.
4776  *  <snap_id>
4777  *      An optional snapshot id.  If provided, the mapping will
4778  *      present data from the image at the time that snapshot was
4779  *      created.  The image head is used if no snapshot id is
4780  *      provided.  Snapshot mappings are always read-only.
4781  */
4782 static int rbd_add_parse_args(const char *buf,
4783                                 struct ceph_options **ceph_opts,
4784                                 struct rbd_options **opts,
4785                                 struct rbd_spec **rbd_spec)
4786 {
4787         size_t len;
4788         char *options;
4789         const char *mon_addrs;
4790         char *snap_name;
4791         size_t mon_addrs_size;
4792         struct rbd_spec *spec = NULL;
4793         struct rbd_options *rbd_opts = NULL;
4794         struct ceph_options *copts;
4795         int ret;
4796
4797         /* The first four tokens are required */
4798
4799         len = next_token(&buf);
4800         if (!len) {
4801                 rbd_warn(NULL, "no monitor address(es) provided");
4802                 return -EINVAL;
4803         }
4804         mon_addrs = buf;
4805         mon_addrs_size = len + 1;
4806         buf += len;
4807
4808         ret = -EINVAL;
4809         options = dup_token(&buf, NULL);
4810         if (!options)
4811                 return -ENOMEM;
4812         if (!*options) {
4813                 rbd_warn(NULL, "no options provided");
4814                 goto out_err;
4815         }
4816
4817         spec = rbd_spec_alloc();
4818         if (!spec)
4819                 goto out_mem;
4820
4821         spec->pool_name = dup_token(&buf, NULL);
4822         if (!spec->pool_name)
4823                 goto out_mem;
4824         if (!*spec->pool_name) {
4825                 rbd_warn(NULL, "no pool name provided");
4826                 goto out_err;
4827         }
4828
4829         spec->image_name = dup_token(&buf, NULL);
4830         if (!spec->image_name)
4831                 goto out_mem;
4832         if (!*spec->image_name) {
4833                 rbd_warn(NULL, "no image name provided");
4834                 goto out_err;
4835         }
4836
4837         /*
4838          * Snapshot name is optional; default is to use "-"
4839          * (indicating the head/no snapshot).
4840          */
4841         len = next_token(&buf);
4842         if (!len) {
4843                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4844                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4845         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4846                 ret = -ENAMETOOLONG;
4847                 goto out_err;
4848         }
4849         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4850         if (!snap_name)
4851                 goto out_mem;
4852         *(snap_name + len) = '\0';
4853         spec->snap_name = snap_name;
4854
4855         /* Initialize all rbd options to the defaults */
4856
4857         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4858         if (!rbd_opts)
4859                 goto out_mem;
4860
4861         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4862
4863         copts = ceph_parse_options(options, mon_addrs,
4864                                         mon_addrs + mon_addrs_size - 1,
4865                                         parse_rbd_opts_token, rbd_opts);
4866         if (IS_ERR(copts)) {
4867                 ret = PTR_ERR(copts);
4868                 goto out_err;
4869         }
4870         kfree(options);
4871
4872         *ceph_opts = copts;
4873         *opts = rbd_opts;
4874         *rbd_spec = spec;
4875
4876         return 0;
4877 out_mem:
4878         ret = -ENOMEM;
4879 out_err:
4880         kfree(rbd_opts);
4881         rbd_spec_put(spec);
4882         kfree(options);
4883
4884         return ret;
4885 }
4886
4887 /*
4888  * Return pool id (>= 0) or a negative error code.
4889  */
4890 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
4891 {
4892         u64 newest_epoch;
4893         unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
4894         int tries = 0;
4895         int ret;
4896
4897 again:
4898         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
4899         if (ret == -ENOENT && tries++ < 1) {
4900                 ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
4901                                                &newest_epoch);
4902                 if (ret < 0)
4903                         return ret;
4904
4905                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
4906                         ceph_monc_request_next_osdmap(&rbdc->client->monc);
4907                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
4908                                                      newest_epoch, timeout);
4909                         goto again;
4910                 } else {
4911                         /* the osdmap we have is new enough */
4912                         return -ENOENT;
4913                 }
4914         }
4915
4916         return ret;
4917 }
4918
4919 /*
4920  * An rbd format 2 image has a unique identifier, distinct from the
4921  * name given to it by the user.  Internally, that identifier is
4922  * what's used to specify the names of objects related to the image.
4923  *
4924  * A special "rbd id" object is used to map an rbd image name to its
4925  * id.  If that object doesn't exist, then there is no v2 rbd image
4926  * with the supplied name.
4927  *
4928  * This function will record the given rbd_dev's image_id field if
4929  * it can be determined, and in that case will return 0.  If any
4930  * errors occur a negative errno will be returned and the rbd_dev's
4931  * image_id field will be unchanged (and should be NULL).
4932  */
4933 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4934 {
4935         int ret;
4936         size_t size;
4937         char *object_name;
4938         void *response;
4939         char *image_id;
4940
4941         /*
4942          * When probing a parent image, the image id is already
4943          * known (and the image name likely is not).  There's no
4944          * need to fetch the image id again in this case.  We
4945          * do still need to set the image format though.
4946          */
4947         if (rbd_dev->spec->image_id) {
4948                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4949
4950                 return 0;
4951         }
4952
4953         /*
4954          * First, see if the format 2 image id file exists, and if
4955          * so, get the image's persistent id from it.
4956          */
4957         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4958         object_name = kmalloc(size, GFP_NOIO);
4959         if (!object_name)
4960                 return -ENOMEM;
4961         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4962         dout("rbd id object name is %s\n", object_name);
4963
4964         /* Response will be an encoded string, which includes a length */
4965
4966         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4967         response = kzalloc(size, GFP_NOIO);
4968         if (!response) {
4969                 ret = -ENOMEM;
4970                 goto out;
4971         }
4972
4973         /* If it doesn't exist we'll assume it's a format 1 image */
4974
4975         ret = rbd_obj_method_sync(rbd_dev, object_name,
4976                                 "rbd", "get_id", NULL, 0,
4977                                 response, RBD_IMAGE_ID_LEN_MAX);
4978         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4979         if (ret == -ENOENT) {
4980                 image_id = kstrdup("", GFP_KERNEL);
4981                 ret = image_id ? 0 : -ENOMEM;
4982                 if (!ret)
4983                         rbd_dev->image_format = 1;
4984         } else if (ret >= 0) {
4985                 void *p = response;
4986
4987                 image_id = ceph_extract_encoded_string(&p, p + ret,
4988                                                 NULL, GFP_NOIO);
4989                 ret = PTR_ERR_OR_ZERO(image_id);
4990                 if (!ret)
4991                         rbd_dev->image_format = 2;
4992         }
4993
4994         if (!ret) {
4995                 rbd_dev->spec->image_id = image_id;
4996                 dout("image_id is %s\n", image_id);
4997         }
4998 out:
4999         kfree(response);
5000         kfree(object_name);
5001
5002         return ret;
5003 }
5004
5005 /*
5006  * Undo whatever state changes are made by v1 or v2 header info
5007  * call.
5008  */
5009 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5010 {
5011         struct rbd_image_header *header;
5012
5013         /* Drop parent reference unless it's already been done (or none) */
5014
5015         if (rbd_dev->parent_overlap)
5016                 rbd_dev_parent_put(rbd_dev);
5017
5018         /* Free dynamic fields from the header, then zero it out */
5019
5020         header = &rbd_dev->header;
5021         ceph_put_snap_context(header->snapc);
5022         kfree(header->snap_sizes);
5023         kfree(header->snap_names);
5024         kfree(header->object_prefix);
5025         memset(header, 0, sizeof (*header));
5026 }
5027
5028 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5029 {
5030         int ret;
5031
5032         ret = rbd_dev_v2_object_prefix(rbd_dev);
5033         if (ret)
5034                 goto out_err;
5035
5036         /*
5037          * Get the and check features for the image.  Currently the
5038          * features are assumed to never change.
5039          */
5040         ret = rbd_dev_v2_features(rbd_dev);
5041         if (ret)
5042                 goto out_err;
5043
5044         /* If the image supports fancy striping, get its parameters */
5045
5046         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5047                 ret = rbd_dev_v2_striping_info(rbd_dev);
5048                 if (ret < 0)
5049                         goto out_err;
5050         }
5051         /* No support for crypto and compression type format 2 images */
5052
5053         return 0;
5054 out_err:
5055         rbd_dev->header.features = 0;
5056         kfree(rbd_dev->header.object_prefix);
5057         rbd_dev->header.object_prefix = NULL;
5058
5059         return ret;
5060 }
5061
5062 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
5063 {
5064         struct rbd_device *parent = NULL;
5065         struct rbd_spec *parent_spec;
5066         struct rbd_client *rbdc;
5067         int ret;
5068
5069         if (!rbd_dev->parent_spec)
5070                 return 0;
5071         /*
5072          * We need to pass a reference to the client and the parent
5073          * spec when creating the parent rbd_dev.  Images related by
5074          * parent/child relationships always share both.
5075          */
5076         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
5077         rbdc = __rbd_get_client(rbd_dev->rbd_client);
5078
5079         ret = -ENOMEM;
5080         parent = rbd_dev_create(rbdc, parent_spec);
5081         if (!parent)
5082                 goto out_err;
5083
5084         ret = rbd_dev_image_probe(parent, false);
5085         if (ret < 0)
5086                 goto out_err;
5087         rbd_dev->parent = parent;
5088         atomic_set(&rbd_dev->parent_ref, 1);
5089
5090         return 0;
5091 out_err:
5092         if (parent) {
5093                 rbd_dev_unparent(rbd_dev);
5094                 kfree(rbd_dev->header_name);
5095                 rbd_dev_destroy(parent);
5096         } else {
5097                 rbd_put_client(rbdc);
5098                 rbd_spec_put(parent_spec);
5099         }
5100
5101         return ret;
5102 }
5103
5104 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5105 {
5106         int ret;
5107
5108         /* Get an id and fill in device name. */
5109
5110         ret = rbd_dev_id_get(rbd_dev);
5111         if (ret)
5112                 return ret;
5113
5114         BUILD_BUG_ON(DEV_NAME_LEN
5115                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5116         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5117
5118         /* Record our major and minor device numbers. */
5119
5120         if (!single_major) {
5121                 ret = register_blkdev(0, rbd_dev->name);
5122                 if (ret < 0)
5123                         goto err_out_id;
5124
5125                 rbd_dev->major = ret;
5126                 rbd_dev->minor = 0;
5127         } else {
5128                 rbd_dev->major = rbd_major;
5129                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5130         }
5131
5132         /* Set up the blkdev mapping. */
5133
5134         ret = rbd_init_disk(rbd_dev);
5135         if (ret)
5136                 goto err_out_blkdev;
5137
5138         ret = rbd_dev_mapping_set(rbd_dev);
5139         if (ret)
5140                 goto err_out_disk;
5141
5142         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5143         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5144
5145         rbd_dev->rq_wq = alloc_workqueue("%s", 0, 0, rbd_dev->disk->disk_name);
5146         if (!rbd_dev->rq_wq) {
5147                 ret = -ENOMEM;
5148                 goto err_out_mapping;
5149         }
5150
5151         ret = rbd_bus_add_dev(rbd_dev);
5152         if (ret)
5153                 goto err_out_workqueue;
5154
5155         /* Everything's ready.  Announce the disk to the world. */
5156
5157         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5158         add_disk(rbd_dev->disk);
5159
5160         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5161                 (unsigned long long) rbd_dev->mapping.size);
5162
5163         return ret;
5164
5165 err_out_workqueue:
5166         destroy_workqueue(rbd_dev->rq_wq);
5167         rbd_dev->rq_wq = NULL;
5168 err_out_mapping:
5169         rbd_dev_mapping_clear(rbd_dev);
5170 err_out_disk:
5171         rbd_free_disk(rbd_dev);
5172 err_out_blkdev:
5173         if (!single_major)
5174                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5175 err_out_id:
5176         rbd_dev_id_put(rbd_dev);
5177         rbd_dev_mapping_clear(rbd_dev);
5178
5179         return ret;
5180 }
5181
5182 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5183 {
5184         struct rbd_spec *spec = rbd_dev->spec;
5185         size_t size;
5186
5187         /* Record the header object name for this rbd image. */
5188
5189         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5190
5191         if (rbd_dev->image_format == 1)
5192                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
5193         else
5194                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
5195
5196         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
5197         if (!rbd_dev->header_name)
5198                 return -ENOMEM;
5199
5200         if (rbd_dev->image_format == 1)
5201                 sprintf(rbd_dev->header_name, "%s%s",
5202                         spec->image_name, RBD_SUFFIX);
5203         else
5204                 sprintf(rbd_dev->header_name, "%s%s",
5205                         RBD_HEADER_PREFIX, spec->image_id);
5206         return 0;
5207 }
5208
5209 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5210 {
5211         rbd_dev_unprobe(rbd_dev);
5212         kfree(rbd_dev->header_name);
5213         rbd_dev->header_name = NULL;
5214         rbd_dev->image_format = 0;
5215         kfree(rbd_dev->spec->image_id);
5216         rbd_dev->spec->image_id = NULL;
5217
5218         rbd_dev_destroy(rbd_dev);
5219 }
5220
5221 /*
5222  * Probe for the existence of the header object for the given rbd
5223  * device.  If this image is the one being mapped (i.e., not a
5224  * parent), initiate a watch on its header object before using that
5225  * object to get detailed information about the rbd image.
5226  */
5227 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5228 {
5229         int ret;
5230
5231         /*
5232          * Get the id from the image id object.  Unless there's an
5233          * error, rbd_dev->spec->image_id will be filled in with
5234          * a dynamically-allocated string, and rbd_dev->image_format
5235          * will be set to either 1 or 2.
5236          */
5237         ret = rbd_dev_image_id(rbd_dev);
5238         if (ret)
5239                 return ret;
5240
5241         ret = rbd_dev_header_name(rbd_dev);
5242         if (ret)
5243                 goto err_out_format;
5244
5245         if (mapping) {
5246                 ret = rbd_dev_header_watch_sync(rbd_dev);
5247                 if (ret)
5248                         goto out_header_name;
5249         }
5250
5251         ret = rbd_dev_header_info(rbd_dev);
5252         if (ret)
5253                 goto err_out_watch;
5254
5255         /*
5256          * If this image is the one being mapped, we have pool name and
5257          * id, image name and id, and snap name - need to fill snap id.
5258          * Otherwise this is a parent image, identified by pool, image
5259          * and snap ids - need to fill in names for those ids.
5260          */
5261         if (mapping)
5262                 ret = rbd_spec_fill_snap_id(rbd_dev);
5263         else
5264                 ret = rbd_spec_fill_names(rbd_dev);
5265         if (ret)
5266                 goto err_out_probe;
5267
5268         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5269                 ret = rbd_dev_v2_parent_info(rbd_dev);
5270                 if (ret)
5271                         goto err_out_probe;
5272
5273                 /*
5274                  * Need to warn users if this image is the one being
5275                  * mapped and has a parent.
5276                  */
5277                 if (mapping && rbd_dev->parent_spec)
5278                         rbd_warn(rbd_dev,
5279                                  "WARNING: kernel layering is EXPERIMENTAL!");
5280         }
5281
5282         ret = rbd_dev_probe_parent(rbd_dev);
5283         if (ret)
5284                 goto err_out_probe;
5285
5286         dout("discovered format %u image, header name is %s\n",
5287                 rbd_dev->image_format, rbd_dev->header_name);
5288         return 0;
5289
5290 err_out_probe:
5291         rbd_dev_unprobe(rbd_dev);
5292 err_out_watch:
5293         if (mapping)
5294                 rbd_dev_header_unwatch_sync(rbd_dev);
5295 out_header_name:
5296         kfree(rbd_dev->header_name);
5297         rbd_dev->header_name = NULL;
5298 err_out_format:
5299         rbd_dev->image_format = 0;
5300         kfree(rbd_dev->spec->image_id);
5301         rbd_dev->spec->image_id = NULL;
5302         return ret;
5303 }
5304
5305 static ssize_t do_rbd_add(struct bus_type *bus,
5306                           const char *buf,
5307                           size_t count)
5308 {
5309         struct rbd_device *rbd_dev = NULL;
5310         struct ceph_options *ceph_opts = NULL;
5311         struct rbd_options *rbd_opts = NULL;
5312         struct rbd_spec *spec = NULL;
5313         struct rbd_client *rbdc;
5314         bool read_only;
5315         int rc = -ENOMEM;
5316
5317         if (!try_module_get(THIS_MODULE))
5318                 return -ENODEV;
5319
5320         /* parse add command */
5321         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5322         if (rc < 0)
5323                 goto err_out_module;
5324         read_only = rbd_opts->read_only;
5325         kfree(rbd_opts);
5326         rbd_opts = NULL;        /* done with this */
5327
5328         rbdc = rbd_get_client(ceph_opts);
5329         if (IS_ERR(rbdc)) {
5330                 rc = PTR_ERR(rbdc);
5331                 goto err_out_args;
5332         }
5333
5334         /* pick the pool */
5335         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
5336         if (rc < 0)
5337                 goto err_out_client;
5338         spec->pool_id = (u64)rc;
5339
5340         /* The ceph file layout needs to fit pool id in 32 bits */
5341
5342         if (spec->pool_id > (u64)U32_MAX) {
5343                 rbd_warn(NULL, "pool id too large (%llu > %u)",
5344                                 (unsigned long long)spec->pool_id, U32_MAX);
5345                 rc = -EIO;
5346                 goto err_out_client;
5347         }
5348
5349         rbd_dev = rbd_dev_create(rbdc, spec);
5350         if (!rbd_dev)
5351                 goto err_out_client;
5352         rbdc = NULL;            /* rbd_dev now owns this */
5353         spec = NULL;            /* rbd_dev now owns this */
5354
5355         rc = rbd_dev_image_probe(rbd_dev, true);
5356         if (rc < 0)
5357                 goto err_out_rbd_dev;
5358
5359         /* If we are mapping a snapshot it must be marked read-only */
5360
5361         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5362                 read_only = true;
5363         rbd_dev->mapping.read_only = read_only;
5364
5365         rc = rbd_dev_device_setup(rbd_dev);
5366         if (rc) {
5367                 /*
5368                  * rbd_dev_header_unwatch_sync() can't be moved into
5369                  * rbd_dev_image_release() without refactoring, see
5370                  * commit 1f3ef78861ac.
5371                  */
5372                 rbd_dev_header_unwatch_sync(rbd_dev);
5373                 rbd_dev_image_release(rbd_dev);
5374                 goto err_out_module;
5375         }
5376
5377         return count;
5378
5379 err_out_rbd_dev:
5380         rbd_dev_destroy(rbd_dev);
5381 err_out_client:
5382         rbd_put_client(rbdc);
5383 err_out_args:
5384         rbd_spec_put(spec);
5385 err_out_module:
5386         module_put(THIS_MODULE);
5387
5388         dout("Error adding device %s\n", buf);
5389
5390         return (ssize_t)rc;
5391 }
5392
5393 static ssize_t rbd_add(struct bus_type *bus,
5394                        const char *buf,
5395                        size_t count)
5396 {
5397         if (single_major)
5398                 return -EINVAL;
5399
5400         return do_rbd_add(bus, buf, count);
5401 }
5402
5403 static ssize_t rbd_add_single_major(struct bus_type *bus,
5404                                     const char *buf,
5405                                     size_t count)
5406 {
5407         return do_rbd_add(bus, buf, count);
5408 }
5409
5410 static void rbd_dev_device_release(struct device *dev)
5411 {
5412         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5413
5414         destroy_workqueue(rbd_dev->rq_wq);
5415         rbd_free_disk(rbd_dev);
5416         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5417         rbd_dev_mapping_clear(rbd_dev);
5418         if (!single_major)
5419                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5420         rbd_dev_id_put(rbd_dev);
5421         rbd_dev_mapping_clear(rbd_dev);
5422 }
5423
5424 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5425 {
5426         while (rbd_dev->parent) {
5427                 struct rbd_device *first = rbd_dev;
5428                 struct rbd_device *second = first->parent;
5429                 struct rbd_device *third;
5430
5431                 /*
5432                  * Follow to the parent with no grandparent and
5433                  * remove it.
5434                  */
5435                 while (second && (third = second->parent)) {
5436                         first = second;
5437                         second = third;
5438                 }
5439                 rbd_assert(second);
5440                 rbd_dev_image_release(second);
5441                 first->parent = NULL;
5442                 first->parent_overlap = 0;
5443
5444                 rbd_assert(first->parent_spec);
5445                 rbd_spec_put(first->parent_spec);
5446                 first->parent_spec = NULL;
5447         }
5448 }
5449
5450 static ssize_t do_rbd_remove(struct bus_type *bus,
5451                              const char *buf,
5452                              size_t count)
5453 {
5454         struct rbd_device *rbd_dev = NULL;
5455         struct list_head *tmp;
5456         int dev_id;
5457         unsigned long ul;
5458         bool already = false;
5459         int ret;
5460
5461         ret = kstrtoul(buf, 10, &ul);
5462         if (ret)
5463                 return ret;
5464
5465         /* convert to int; abort if we lost anything in the conversion */
5466         dev_id = (int)ul;
5467         if (dev_id != ul)
5468                 return -EINVAL;
5469
5470         ret = -ENOENT;
5471         spin_lock(&rbd_dev_list_lock);
5472         list_for_each(tmp, &rbd_dev_list) {
5473                 rbd_dev = list_entry(tmp, struct rbd_device, node);
5474                 if (rbd_dev->dev_id == dev_id) {
5475                         ret = 0;
5476                         break;
5477                 }
5478         }
5479         if (!ret) {
5480                 spin_lock_irq(&rbd_dev->lock);
5481                 if (rbd_dev->open_count)
5482                         ret = -EBUSY;
5483                 else
5484                         already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5485                                                         &rbd_dev->flags);
5486                 spin_unlock_irq(&rbd_dev->lock);
5487         }
5488         spin_unlock(&rbd_dev_list_lock);
5489         if (ret < 0 || already)
5490                 return ret;
5491
5492         rbd_dev_header_unwatch_sync(rbd_dev);
5493         /*
5494          * flush remaining watch callbacks - these must be complete
5495          * before the osd_client is shutdown
5496          */
5497         dout("%s: flushing notifies", __func__);
5498         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5499
5500         /*
5501          * Don't free anything from rbd_dev->disk until after all
5502          * notifies are completely processed. Otherwise
5503          * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5504          * in a potential use after free of rbd_dev->disk or rbd_dev.
5505          */
5506         rbd_bus_del_dev(rbd_dev);
5507         rbd_dev_image_release(rbd_dev);
5508         module_put(THIS_MODULE);
5509
5510         return count;
5511 }
5512
5513 static ssize_t rbd_remove(struct bus_type *bus,
5514                           const char *buf,
5515                           size_t count)
5516 {
5517         if (single_major)
5518                 return -EINVAL;
5519
5520         return do_rbd_remove(bus, buf, count);
5521 }
5522
5523 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5524                                        const char *buf,
5525                                        size_t count)
5526 {
5527         return do_rbd_remove(bus, buf, count);
5528 }
5529
5530 /*
5531  * create control files in sysfs
5532  * /sys/bus/rbd/...
5533  */
5534 static int rbd_sysfs_init(void)
5535 {
5536         int ret;
5537
5538         ret = device_register(&rbd_root_dev);
5539         if (ret < 0)
5540                 return ret;
5541
5542         ret = bus_register(&rbd_bus_type);
5543         if (ret < 0)
5544                 device_unregister(&rbd_root_dev);
5545
5546         return ret;
5547 }
5548
5549 static void rbd_sysfs_cleanup(void)
5550 {
5551         bus_unregister(&rbd_bus_type);
5552         device_unregister(&rbd_root_dev);
5553 }
5554
5555 static int rbd_slab_init(void)
5556 {
5557         rbd_assert(!rbd_img_request_cache);
5558         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5559                                         sizeof (struct rbd_img_request),
5560                                         __alignof__(struct rbd_img_request),
5561                                         0, NULL);
5562         if (!rbd_img_request_cache)
5563                 return -ENOMEM;
5564
5565         rbd_assert(!rbd_obj_request_cache);
5566         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5567                                         sizeof (struct rbd_obj_request),
5568                                         __alignof__(struct rbd_obj_request),
5569                                         0, NULL);
5570         if (!rbd_obj_request_cache)
5571                 goto out_err;
5572
5573         rbd_assert(!rbd_segment_name_cache);
5574         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5575                                         CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5576         if (rbd_segment_name_cache)
5577                 return 0;
5578 out_err:
5579         if (rbd_obj_request_cache) {
5580                 kmem_cache_destroy(rbd_obj_request_cache);
5581                 rbd_obj_request_cache = NULL;
5582         }
5583
5584         kmem_cache_destroy(rbd_img_request_cache);
5585         rbd_img_request_cache = NULL;
5586
5587         return -ENOMEM;
5588 }
5589
5590 static void rbd_slab_exit(void)
5591 {
5592         rbd_assert(rbd_segment_name_cache);
5593         kmem_cache_destroy(rbd_segment_name_cache);
5594         rbd_segment_name_cache = NULL;
5595
5596         rbd_assert(rbd_obj_request_cache);
5597         kmem_cache_destroy(rbd_obj_request_cache);
5598         rbd_obj_request_cache = NULL;
5599
5600         rbd_assert(rbd_img_request_cache);
5601         kmem_cache_destroy(rbd_img_request_cache);
5602         rbd_img_request_cache = NULL;
5603 }
5604
5605 static int __init rbd_init(void)
5606 {
5607         int rc;
5608
5609         if (!libceph_compatible(NULL)) {
5610                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5611                 return -EINVAL;
5612         }
5613
5614         rc = rbd_slab_init();
5615         if (rc)
5616                 return rc;
5617
5618         if (single_major) {
5619                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5620                 if (rbd_major < 0) {
5621                         rc = rbd_major;
5622                         goto err_out_slab;
5623                 }
5624         }
5625
5626         rc = rbd_sysfs_init();
5627         if (rc)
5628                 goto err_out_blkdev;
5629
5630         if (single_major)
5631                 pr_info("loaded (major %d)\n", rbd_major);
5632         else
5633                 pr_info("loaded\n");
5634
5635         return 0;
5636
5637 err_out_blkdev:
5638         if (single_major)
5639                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5640 err_out_slab:
5641         rbd_slab_exit();
5642         return rc;
5643 }
5644
5645 static void __exit rbd_exit(void)
5646 {
5647         ida_destroy(&rbd_dev_id_ida);
5648         rbd_sysfs_cleanup();
5649         if (single_major)
5650                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5651         rbd_slab_exit();
5652 }
5653
5654 module_init(rbd_init);
5655 module_exit(rbd_exit);
5656
5657 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5658 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5659 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5660 /* following authorship retained from original osdblk.c */
5661 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5662
5663 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5664 MODULE_LICENSE("GPL");