]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
Merge tag 'tty-4.12-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/gregkh/tty
[karo-tx-linux.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG       /* Activate rbd_assert() calls */
52
53 /*
54  * The basic unit of block I/O is a sector.  It is interpreted in a
55  * number of contexts in Linux (blk, bio, genhd), but the default is
56  * universally 512 bytes.  These symbols are just slightly more
57  * meaningful than the bare numbers they represent.
58  */
59 #define SECTOR_SHIFT    9
60 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
61
62 /*
63  * Increment the given counter and return its updated value.
64  * If the counter is already 0 it will not be incremented.
65  * If the counter is already at its maximum value returns
66  * -EINVAL without updating it.
67  */
68 static int atomic_inc_return_safe(atomic_t *v)
69 {
70         unsigned int counter;
71
72         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73         if (counter <= (unsigned int)INT_MAX)
74                 return (int)counter;
75
76         atomic_dec(v);
77
78         return -EINVAL;
79 }
80
81 /* Decrement the counter.  Return the resulting value, or -EINVAL */
82 static int atomic_dec_return_safe(atomic_t *v)
83 {
84         int counter;
85
86         counter = atomic_dec_return(v);
87         if (counter >= 0)
88                 return counter;
89
90         atomic_inc(v);
91
92         return -EINVAL;
93 }
94
95 #define RBD_DRV_NAME "rbd"
96
97 #define RBD_MINORS_PER_MAJOR            256
98 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
99
100 #define RBD_MAX_PARENT_CHAIN_LEN        16
101
102 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
103 #define RBD_MAX_SNAP_NAME_LEN   \
104                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
106 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
107
108 #define RBD_SNAP_HEAD_NAME      "-"
109
110 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
111
112 /* This allows a single page to hold an image name sent by OSD */
113 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
114 #define RBD_IMAGE_ID_LEN_MAX    64
115
116 #define RBD_OBJ_PREFIX_LEN_MAX  64
117
118 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
119 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
120
121 /* Feature bits */
122
123 #define RBD_FEATURE_LAYERING            (1ULL<<0)
124 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
125 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
126 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
127
128 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
129                                  RBD_FEATURE_STRIPINGV2 |       \
130                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
131                                  RBD_FEATURE_DATA_POOL)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190
191         const char      *image_id;
192         const char      *image_name;
193
194         u64             snap_id;
195         const char      *snap_name;
196
197         struct kref     kref;
198 };
199
200 /*
201  * an instance of the client.  multiple devices may share an rbd client.
202  */
203 struct rbd_client {
204         struct ceph_client      *client;
205         struct kref             kref;
206         struct list_head        node;
207 };
208
209 struct rbd_img_request;
210 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
211
212 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
213
214 struct rbd_obj_request;
215 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
219 };
220
221 enum obj_operation_type {
222         OBJ_OP_WRITE,
223         OBJ_OP_READ,
224         OBJ_OP_DISCARD,
225 };
226
227 enum obj_req_flags {
228         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
229         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
230         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
231         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
232 };
233
234 struct rbd_obj_request {
235         u64                     object_no;
236         u64                     offset;         /* object start byte */
237         u64                     length;         /* bytes from offset */
238         unsigned long           flags;
239
240         /*
241          * An object request associated with an image will have its
242          * img_data flag set; a standalone object request will not.
243          *
244          * A standalone object request will have which == BAD_WHICH
245          * and a null obj_request pointer.
246          *
247          * An object request initiated in support of a layered image
248          * object (to check for its existence before a write) will
249          * have which == BAD_WHICH and a non-null obj_request pointer.
250          *
251          * Finally, an object request for rbd image data will have
252          * which != BAD_WHICH, and will have a non-null img_request
253          * pointer.  The value of which will be in the range
254          * 0..(img_request->obj_request_count-1).
255          */
256         union {
257                 struct rbd_obj_request  *obj_request;   /* STAT op */
258                 struct {
259                         struct rbd_img_request  *img_request;
260                         u64                     img_offset;
261                         /* links for img_request->obj_requests list */
262                         struct list_head        links;
263                 };
264         };
265         u32                     which;          /* posn image request list */
266
267         enum obj_request_type   type;
268         union {
269                 struct bio      *bio_list;
270                 struct {
271                         struct page     **pages;
272                         u32             page_count;
273                 };
274         };
275         struct page             **copyup_pages;
276         u32                     copyup_page_count;
277
278         struct ceph_osd_request *osd_req;
279
280         u64                     xferred;        /* bytes transferred */
281         int                     result;
282
283         rbd_obj_callback_t      callback;
284         struct completion       completion;
285
286         struct kref             kref;
287 };
288
289 enum img_req_flags {
290         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
291         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
292         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
293         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
294 };
295
296 struct rbd_img_request {
297         struct rbd_device       *rbd_dev;
298         u64                     offset; /* starting image byte offset */
299         u64                     length; /* byte count from offset */
300         unsigned long           flags;
301         union {
302                 u64                     snap_id;        /* for reads */
303                 struct ceph_snap_context *snapc;        /* for writes */
304         };
305         union {
306                 struct request          *rq;            /* block request */
307                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
308         };
309         struct page             **copyup_pages;
310         u32                     copyup_page_count;
311         spinlock_t              completion_lock;/* protects next_completion */
312         u32                     next_completion;
313         rbd_img_callback_t      callback;
314         u64                     xferred;/* aggregate bytes transferred */
315         int                     result; /* first nonzero obj_request result */
316
317         u32                     obj_request_count;
318         struct list_head        obj_requests;   /* rbd_obj_request structs */
319
320         struct kref             kref;
321 };
322
323 #define for_each_obj_request(ireq, oreq) \
324         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
325 #define for_each_obj_request_from(ireq, oreq) \
326         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
327 #define for_each_obj_request_safe(ireq, oreq, n) \
328         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
329
330 enum rbd_watch_state {
331         RBD_WATCH_STATE_UNREGISTERED,
332         RBD_WATCH_STATE_REGISTERED,
333         RBD_WATCH_STATE_ERROR,
334 };
335
336 enum rbd_lock_state {
337         RBD_LOCK_STATE_UNLOCKED,
338         RBD_LOCK_STATE_LOCKED,
339         RBD_LOCK_STATE_RELEASING,
340 };
341
342 /* WatchNotify::ClientId */
343 struct rbd_client_id {
344         u64 gid;
345         u64 handle;
346 };
347
348 struct rbd_mapping {
349         u64                     size;
350         u64                     features;
351         bool                    read_only;
352 };
353
354 /*
355  * a single device
356  */
357 struct rbd_device {
358         int                     dev_id;         /* blkdev unique id */
359
360         int                     major;          /* blkdev assigned major */
361         int                     minor;
362         struct gendisk          *disk;          /* blkdev's gendisk and rq */
363
364         u32                     image_format;   /* Either 1 or 2 */
365         struct rbd_client       *rbd_client;
366
367         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
368
369         spinlock_t              lock;           /* queue, flags, open_count */
370
371         struct rbd_image_header header;
372         unsigned long           flags;          /* possibly lock protected */
373         struct rbd_spec         *spec;
374         struct rbd_options      *opts;
375         char                    *config_info;   /* add{,_single_major} string */
376
377         struct ceph_object_id   header_oid;
378         struct ceph_object_locator header_oloc;
379
380         struct ceph_file_layout layout;         /* used for all rbd requests */
381
382         struct mutex            watch_mutex;
383         enum rbd_watch_state    watch_state;
384         struct ceph_osd_linger_request *watch_handle;
385         u64                     watch_cookie;
386         struct delayed_work     watch_dwork;
387
388         struct rw_semaphore     lock_rwsem;
389         enum rbd_lock_state     lock_state;
390         struct rbd_client_id    owner_cid;
391         struct work_struct      acquired_lock_work;
392         struct work_struct      released_lock_work;
393         struct delayed_work     lock_dwork;
394         struct work_struct      unlock_work;
395         wait_queue_head_t       lock_waitq;
396
397         struct workqueue_struct *task_wq;
398
399         struct rbd_spec         *parent_spec;
400         u64                     parent_overlap;
401         atomic_t                parent_ref;
402         struct rbd_device       *parent;
403
404         /* Block layer tags. */
405         struct blk_mq_tag_set   tag_set;
406
407         /* protects updating the header */
408         struct rw_semaphore     header_rwsem;
409
410         struct rbd_mapping      mapping;
411
412         struct list_head        node;
413
414         /* sysfs related */
415         struct device           dev;
416         unsigned long           open_count;     /* protected by lock */
417 };
418
419 /*
420  * Flag bits for rbd_dev->flags:
421  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
422  *   by rbd_dev->lock
423  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
424  */
425 enum rbd_dev_flags {
426         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
427         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
428         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
429 };
430
431 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
432
433 static LIST_HEAD(rbd_dev_list);    /* devices */
434 static DEFINE_SPINLOCK(rbd_dev_list_lock);
435
436 static LIST_HEAD(rbd_client_list);              /* clients */
437 static DEFINE_SPINLOCK(rbd_client_list_lock);
438
439 /* Slab caches for frequently-allocated structures */
440
441 static struct kmem_cache        *rbd_img_request_cache;
442 static struct kmem_cache        *rbd_obj_request_cache;
443
444 static int rbd_major;
445 static DEFINE_IDA(rbd_dev_id_ida);
446
447 static struct workqueue_struct *rbd_wq;
448
449 /*
450  * Default to false for now, as single-major requires >= 0.75 version of
451  * userspace rbd utility.
452  */
453 static bool single_major = false;
454 module_param(single_major, bool, S_IRUGO);
455 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
456
457 static int rbd_img_request_submit(struct rbd_img_request *img_request);
458
459 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
460                        size_t count);
461 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
462                           size_t count);
463 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
464                                     size_t count);
465 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
466                                        size_t count);
467 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
468 static void rbd_spec_put(struct rbd_spec *spec);
469
470 static int rbd_dev_id_to_minor(int dev_id)
471 {
472         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
473 }
474
475 static int minor_to_rbd_dev_id(int minor)
476 {
477         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
478 }
479
480 static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
481 {
482         return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
483                rbd_dev->spec->snap_id == CEPH_NOSNAP &&
484                !rbd_dev->mapping.read_only;
485 }
486
487 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
488 {
489         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
490                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
491 }
492
493 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
494 {
495         bool is_lock_owner;
496
497         down_read(&rbd_dev->lock_rwsem);
498         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
499         up_read(&rbd_dev->lock_rwsem);
500         return is_lock_owner;
501 }
502
503 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
504 {
505         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
506 }
507
508 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
509 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
510 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
511 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
512 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
513
514 static struct attribute *rbd_bus_attrs[] = {
515         &bus_attr_add.attr,
516         &bus_attr_remove.attr,
517         &bus_attr_add_single_major.attr,
518         &bus_attr_remove_single_major.attr,
519         &bus_attr_supported_features.attr,
520         NULL,
521 };
522
523 static umode_t rbd_bus_is_visible(struct kobject *kobj,
524                                   struct attribute *attr, int index)
525 {
526         if (!single_major &&
527             (attr == &bus_attr_add_single_major.attr ||
528              attr == &bus_attr_remove_single_major.attr))
529                 return 0;
530
531         return attr->mode;
532 }
533
534 static const struct attribute_group rbd_bus_group = {
535         .attrs = rbd_bus_attrs,
536         .is_visible = rbd_bus_is_visible,
537 };
538 __ATTRIBUTE_GROUPS(rbd_bus);
539
540 static struct bus_type rbd_bus_type = {
541         .name           = "rbd",
542         .bus_groups     = rbd_bus_groups,
543 };
544
545 static void rbd_root_dev_release(struct device *dev)
546 {
547 }
548
549 static struct device rbd_root_dev = {
550         .init_name =    "rbd",
551         .release =      rbd_root_dev_release,
552 };
553
554 static __printf(2, 3)
555 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
556 {
557         struct va_format vaf;
558         va_list args;
559
560         va_start(args, fmt);
561         vaf.fmt = fmt;
562         vaf.va = &args;
563
564         if (!rbd_dev)
565                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
566         else if (rbd_dev->disk)
567                 printk(KERN_WARNING "%s: %s: %pV\n",
568                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
569         else if (rbd_dev->spec && rbd_dev->spec->image_name)
570                 printk(KERN_WARNING "%s: image %s: %pV\n",
571                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
572         else if (rbd_dev->spec && rbd_dev->spec->image_id)
573                 printk(KERN_WARNING "%s: id %s: %pV\n",
574                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
575         else    /* punt */
576                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
577                         RBD_DRV_NAME, rbd_dev, &vaf);
578         va_end(args);
579 }
580
581 #ifdef RBD_DEBUG
582 #define rbd_assert(expr)                                                \
583                 if (unlikely(!(expr))) {                                \
584                         printk(KERN_ERR "\nAssertion failure in %s() "  \
585                                                 "at line %d:\n\n"       \
586                                         "\trbd_assert(%s);\n\n",        \
587                                         __func__, __LINE__, #expr);     \
588                         BUG();                                          \
589                 }
590 #else /* !RBD_DEBUG */
591 #  define rbd_assert(expr)      ((void) 0)
592 #endif /* !RBD_DEBUG */
593
594 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
595 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
596 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
597 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
598
599 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
600 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
601 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
602 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
603 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
604                                         u64 snap_id);
605 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
606                                 u8 *order, u64 *snap_size);
607 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
608                 u64 *snap_features);
609
610 static int rbd_open(struct block_device *bdev, fmode_t mode)
611 {
612         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
613         bool removing = false;
614
615         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
616                 return -EROFS;
617
618         spin_lock_irq(&rbd_dev->lock);
619         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
620                 removing = true;
621         else
622                 rbd_dev->open_count++;
623         spin_unlock_irq(&rbd_dev->lock);
624         if (removing)
625                 return -ENOENT;
626
627         (void) get_device(&rbd_dev->dev);
628
629         return 0;
630 }
631
632 static void rbd_release(struct gendisk *disk, fmode_t mode)
633 {
634         struct rbd_device *rbd_dev = disk->private_data;
635         unsigned long open_count_before;
636
637         spin_lock_irq(&rbd_dev->lock);
638         open_count_before = rbd_dev->open_count--;
639         spin_unlock_irq(&rbd_dev->lock);
640         rbd_assert(open_count_before > 0);
641
642         put_device(&rbd_dev->dev);
643 }
644
645 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
646 {
647         int ret = 0;
648         int val;
649         bool ro;
650         bool ro_changed = false;
651
652         /* get_user() may sleep, so call it before taking rbd_dev->lock */
653         if (get_user(val, (int __user *)(arg)))
654                 return -EFAULT;
655
656         ro = val ? true : false;
657         /* Snapshot doesn't allow to write*/
658         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
659                 return -EROFS;
660
661         spin_lock_irq(&rbd_dev->lock);
662         /* prevent others open this device */
663         if (rbd_dev->open_count > 1) {
664                 ret = -EBUSY;
665                 goto out;
666         }
667
668         if (rbd_dev->mapping.read_only != ro) {
669                 rbd_dev->mapping.read_only = ro;
670                 ro_changed = true;
671         }
672
673 out:
674         spin_unlock_irq(&rbd_dev->lock);
675         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
676         if (ret == 0 && ro_changed)
677                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
678
679         return ret;
680 }
681
682 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
683                         unsigned int cmd, unsigned long arg)
684 {
685         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
686         int ret = 0;
687
688         switch (cmd) {
689         case BLKROSET:
690                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
691                 break;
692         default:
693                 ret = -ENOTTY;
694         }
695
696         return ret;
697 }
698
699 #ifdef CONFIG_COMPAT
700 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
701                                 unsigned int cmd, unsigned long arg)
702 {
703         return rbd_ioctl(bdev, mode, cmd, arg);
704 }
705 #endif /* CONFIG_COMPAT */
706
707 static const struct block_device_operations rbd_bd_ops = {
708         .owner                  = THIS_MODULE,
709         .open                   = rbd_open,
710         .release                = rbd_release,
711         .ioctl                  = rbd_ioctl,
712 #ifdef CONFIG_COMPAT
713         .compat_ioctl           = rbd_compat_ioctl,
714 #endif
715 };
716
717 /*
718  * Initialize an rbd client instance.  Success or not, this function
719  * consumes ceph_opts.  Caller holds client_mutex.
720  */
721 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
722 {
723         struct rbd_client *rbdc;
724         int ret = -ENOMEM;
725
726         dout("%s:\n", __func__);
727         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
728         if (!rbdc)
729                 goto out_opt;
730
731         kref_init(&rbdc->kref);
732         INIT_LIST_HEAD(&rbdc->node);
733
734         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
735         if (IS_ERR(rbdc->client))
736                 goto out_rbdc;
737         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
738
739         ret = ceph_open_session(rbdc->client);
740         if (ret < 0)
741                 goto out_client;
742
743         spin_lock(&rbd_client_list_lock);
744         list_add_tail(&rbdc->node, &rbd_client_list);
745         spin_unlock(&rbd_client_list_lock);
746
747         dout("%s: rbdc %p\n", __func__, rbdc);
748
749         return rbdc;
750 out_client:
751         ceph_destroy_client(rbdc->client);
752 out_rbdc:
753         kfree(rbdc);
754 out_opt:
755         if (ceph_opts)
756                 ceph_destroy_options(ceph_opts);
757         dout("%s: error %d\n", __func__, ret);
758
759         return ERR_PTR(ret);
760 }
761
762 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
763 {
764         kref_get(&rbdc->kref);
765
766         return rbdc;
767 }
768
769 /*
770  * Find a ceph client with specific addr and configuration.  If
771  * found, bump its reference count.
772  */
773 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
774 {
775         struct rbd_client *client_node;
776         bool found = false;
777
778         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
779                 return NULL;
780
781         spin_lock(&rbd_client_list_lock);
782         list_for_each_entry(client_node, &rbd_client_list, node) {
783                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
784                         __rbd_get_client(client_node);
785
786                         found = true;
787                         break;
788                 }
789         }
790         spin_unlock(&rbd_client_list_lock);
791
792         return found ? client_node : NULL;
793 }
794
795 /*
796  * (Per device) rbd map options
797  */
798 enum {
799         Opt_queue_depth,
800         Opt_last_int,
801         /* int args above */
802         Opt_last_string,
803         /* string args above */
804         Opt_read_only,
805         Opt_read_write,
806         Opt_lock_on_read,
807         Opt_err
808 };
809
810 static match_table_t rbd_opts_tokens = {
811         {Opt_queue_depth, "queue_depth=%d"},
812         /* int args above */
813         /* string args above */
814         {Opt_read_only, "read_only"},
815         {Opt_read_only, "ro"},          /* Alternate spelling */
816         {Opt_read_write, "read_write"},
817         {Opt_read_write, "rw"},         /* Alternate spelling */
818         {Opt_lock_on_read, "lock_on_read"},
819         {Opt_err, NULL}
820 };
821
822 struct rbd_options {
823         int     queue_depth;
824         bool    read_only;
825         bool    lock_on_read;
826 };
827
828 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
829 #define RBD_READ_ONLY_DEFAULT   false
830 #define RBD_LOCK_ON_READ_DEFAULT false
831
832 static int parse_rbd_opts_token(char *c, void *private)
833 {
834         struct rbd_options *rbd_opts = private;
835         substring_t argstr[MAX_OPT_ARGS];
836         int token, intval, ret;
837
838         token = match_token(c, rbd_opts_tokens, argstr);
839         if (token < Opt_last_int) {
840                 ret = match_int(&argstr[0], &intval);
841                 if (ret < 0) {
842                         pr_err("bad mount option arg (not int) at '%s'\n", c);
843                         return ret;
844                 }
845                 dout("got int token %d val %d\n", token, intval);
846         } else if (token > Opt_last_int && token < Opt_last_string) {
847                 dout("got string token %d val %s\n", token, argstr[0].from);
848         } else {
849                 dout("got token %d\n", token);
850         }
851
852         switch (token) {
853         case Opt_queue_depth:
854                 if (intval < 1) {
855                         pr_err("queue_depth out of range\n");
856                         return -EINVAL;
857                 }
858                 rbd_opts->queue_depth = intval;
859                 break;
860         case Opt_read_only:
861                 rbd_opts->read_only = true;
862                 break;
863         case Opt_read_write:
864                 rbd_opts->read_only = false;
865                 break;
866         case Opt_lock_on_read:
867                 rbd_opts->lock_on_read = true;
868                 break;
869         default:
870                 /* libceph prints "bad option" msg */
871                 return -EINVAL;
872         }
873
874         return 0;
875 }
876
877 static char* obj_op_name(enum obj_operation_type op_type)
878 {
879         switch (op_type) {
880         case OBJ_OP_READ:
881                 return "read";
882         case OBJ_OP_WRITE:
883                 return "write";
884         case OBJ_OP_DISCARD:
885                 return "discard";
886         default:
887                 return "???";
888         }
889 }
890
891 /*
892  * Get a ceph client with specific addr and configuration, if one does
893  * not exist create it.  Either way, ceph_opts is consumed by this
894  * function.
895  */
896 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
897 {
898         struct rbd_client *rbdc;
899
900         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
901         rbdc = rbd_client_find(ceph_opts);
902         if (rbdc)       /* using an existing client */
903                 ceph_destroy_options(ceph_opts);
904         else
905                 rbdc = rbd_client_create(ceph_opts);
906         mutex_unlock(&client_mutex);
907
908         return rbdc;
909 }
910
911 /*
912  * Destroy ceph client
913  *
914  * Caller must hold rbd_client_list_lock.
915  */
916 static void rbd_client_release(struct kref *kref)
917 {
918         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
919
920         dout("%s: rbdc %p\n", __func__, rbdc);
921         spin_lock(&rbd_client_list_lock);
922         list_del(&rbdc->node);
923         spin_unlock(&rbd_client_list_lock);
924
925         ceph_destroy_client(rbdc->client);
926         kfree(rbdc);
927 }
928
929 /*
930  * Drop reference to ceph client node. If it's not referenced anymore, release
931  * it.
932  */
933 static void rbd_put_client(struct rbd_client *rbdc)
934 {
935         if (rbdc)
936                 kref_put(&rbdc->kref, rbd_client_release);
937 }
938
939 static bool rbd_image_format_valid(u32 image_format)
940 {
941         return image_format == 1 || image_format == 2;
942 }
943
944 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
945 {
946         size_t size;
947         u32 snap_count;
948
949         /* The header has to start with the magic rbd header text */
950         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
951                 return false;
952
953         /* The bio layer requires at least sector-sized I/O */
954
955         if (ondisk->options.order < SECTOR_SHIFT)
956                 return false;
957
958         /* If we use u64 in a few spots we may be able to loosen this */
959
960         if (ondisk->options.order > 8 * sizeof (int) - 1)
961                 return false;
962
963         /*
964          * The size of a snapshot header has to fit in a size_t, and
965          * that limits the number of snapshots.
966          */
967         snap_count = le32_to_cpu(ondisk->snap_count);
968         size = SIZE_MAX - sizeof (struct ceph_snap_context);
969         if (snap_count > size / sizeof (__le64))
970                 return false;
971
972         /*
973          * Not only that, but the size of the entire the snapshot
974          * header must also be representable in a size_t.
975          */
976         size -= snap_count * sizeof (__le64);
977         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
978                 return false;
979
980         return true;
981 }
982
983 /*
984  * returns the size of an object in the image
985  */
986 static u32 rbd_obj_bytes(struct rbd_image_header *header)
987 {
988         return 1U << header->obj_order;
989 }
990
991 static void rbd_init_layout(struct rbd_device *rbd_dev)
992 {
993         if (rbd_dev->header.stripe_unit == 0 ||
994             rbd_dev->header.stripe_count == 0) {
995                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
996                 rbd_dev->header.stripe_count = 1;
997         }
998
999         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1000         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1001         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1002         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1003                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1004         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1005 }
1006
1007 /*
1008  * Fill an rbd image header with information from the given format 1
1009  * on-disk header.
1010  */
1011 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1012                                  struct rbd_image_header_ondisk *ondisk)
1013 {
1014         struct rbd_image_header *header = &rbd_dev->header;
1015         bool first_time = header->object_prefix == NULL;
1016         struct ceph_snap_context *snapc;
1017         char *object_prefix = NULL;
1018         char *snap_names = NULL;
1019         u64 *snap_sizes = NULL;
1020         u32 snap_count;
1021         int ret = -ENOMEM;
1022         u32 i;
1023
1024         /* Allocate this now to avoid having to handle failure below */
1025
1026         if (first_time) {
1027                 object_prefix = kstrndup(ondisk->object_prefix,
1028                                          sizeof(ondisk->object_prefix),
1029                                          GFP_KERNEL);
1030                 if (!object_prefix)
1031                         return -ENOMEM;
1032         }
1033
1034         /* Allocate the snapshot context and fill it in */
1035
1036         snap_count = le32_to_cpu(ondisk->snap_count);
1037         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1038         if (!snapc)
1039                 goto out_err;
1040         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1041         if (snap_count) {
1042                 struct rbd_image_snap_ondisk *snaps;
1043                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1044
1045                 /* We'll keep a copy of the snapshot names... */
1046
1047                 if (snap_names_len > (u64)SIZE_MAX)
1048                         goto out_2big;
1049                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1050                 if (!snap_names)
1051                         goto out_err;
1052
1053                 /* ...as well as the array of their sizes. */
1054                 snap_sizes = kmalloc_array(snap_count,
1055                                            sizeof(*header->snap_sizes),
1056                                            GFP_KERNEL);
1057                 if (!snap_sizes)
1058                         goto out_err;
1059
1060                 /*
1061                  * Copy the names, and fill in each snapshot's id
1062                  * and size.
1063                  *
1064                  * Note that rbd_dev_v1_header_info() guarantees the
1065                  * ondisk buffer we're working with has
1066                  * snap_names_len bytes beyond the end of the
1067                  * snapshot id array, this memcpy() is safe.
1068                  */
1069                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1070                 snaps = ondisk->snaps;
1071                 for (i = 0; i < snap_count; i++) {
1072                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1073                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1074                 }
1075         }
1076
1077         /* We won't fail any more, fill in the header */
1078
1079         if (first_time) {
1080                 header->object_prefix = object_prefix;
1081                 header->obj_order = ondisk->options.order;
1082                 rbd_init_layout(rbd_dev);
1083         } else {
1084                 ceph_put_snap_context(header->snapc);
1085                 kfree(header->snap_names);
1086                 kfree(header->snap_sizes);
1087         }
1088
1089         /* The remaining fields always get updated (when we refresh) */
1090
1091         header->image_size = le64_to_cpu(ondisk->image_size);
1092         header->snapc = snapc;
1093         header->snap_names = snap_names;
1094         header->snap_sizes = snap_sizes;
1095
1096         return 0;
1097 out_2big:
1098         ret = -EIO;
1099 out_err:
1100         kfree(snap_sizes);
1101         kfree(snap_names);
1102         ceph_put_snap_context(snapc);
1103         kfree(object_prefix);
1104
1105         return ret;
1106 }
1107
1108 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1109 {
1110         const char *snap_name;
1111
1112         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1113
1114         /* Skip over names until we find the one we are looking for */
1115
1116         snap_name = rbd_dev->header.snap_names;
1117         while (which--)
1118                 snap_name += strlen(snap_name) + 1;
1119
1120         return kstrdup(snap_name, GFP_KERNEL);
1121 }
1122
1123 /*
1124  * Snapshot id comparison function for use with qsort()/bsearch().
1125  * Note that result is for snapshots in *descending* order.
1126  */
1127 static int snapid_compare_reverse(const void *s1, const void *s2)
1128 {
1129         u64 snap_id1 = *(u64 *)s1;
1130         u64 snap_id2 = *(u64 *)s2;
1131
1132         if (snap_id1 < snap_id2)
1133                 return 1;
1134         return snap_id1 == snap_id2 ? 0 : -1;
1135 }
1136
1137 /*
1138  * Search a snapshot context to see if the given snapshot id is
1139  * present.
1140  *
1141  * Returns the position of the snapshot id in the array if it's found,
1142  * or BAD_SNAP_INDEX otherwise.
1143  *
1144  * Note: The snapshot array is in kept sorted (by the osd) in
1145  * reverse order, highest snapshot id first.
1146  */
1147 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1148 {
1149         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1150         u64 *found;
1151
1152         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1153                                 sizeof (snap_id), snapid_compare_reverse);
1154
1155         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1156 }
1157
1158 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1159                                         u64 snap_id)
1160 {
1161         u32 which;
1162         const char *snap_name;
1163
1164         which = rbd_dev_snap_index(rbd_dev, snap_id);
1165         if (which == BAD_SNAP_INDEX)
1166                 return ERR_PTR(-ENOENT);
1167
1168         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1169         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1170 }
1171
1172 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1173 {
1174         if (snap_id == CEPH_NOSNAP)
1175                 return RBD_SNAP_HEAD_NAME;
1176
1177         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1178         if (rbd_dev->image_format == 1)
1179                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1180
1181         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1182 }
1183
1184 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1185                                 u64 *snap_size)
1186 {
1187         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1188         if (snap_id == CEPH_NOSNAP) {
1189                 *snap_size = rbd_dev->header.image_size;
1190         } else if (rbd_dev->image_format == 1) {
1191                 u32 which;
1192
1193                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1194                 if (which == BAD_SNAP_INDEX)
1195                         return -ENOENT;
1196
1197                 *snap_size = rbd_dev->header.snap_sizes[which];
1198         } else {
1199                 u64 size = 0;
1200                 int ret;
1201
1202                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1203                 if (ret)
1204                         return ret;
1205
1206                 *snap_size = size;
1207         }
1208         return 0;
1209 }
1210
1211 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1212                         u64 *snap_features)
1213 {
1214         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1215         if (snap_id == CEPH_NOSNAP) {
1216                 *snap_features = rbd_dev->header.features;
1217         } else if (rbd_dev->image_format == 1) {
1218                 *snap_features = 0;     /* No features for format 1 */
1219         } else {
1220                 u64 features = 0;
1221                 int ret;
1222
1223                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1224                 if (ret)
1225                         return ret;
1226
1227                 *snap_features = features;
1228         }
1229         return 0;
1230 }
1231
1232 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1233 {
1234         u64 snap_id = rbd_dev->spec->snap_id;
1235         u64 size = 0;
1236         u64 features = 0;
1237         int ret;
1238
1239         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1240         if (ret)
1241                 return ret;
1242         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1243         if (ret)
1244                 return ret;
1245
1246         rbd_dev->mapping.size = size;
1247         rbd_dev->mapping.features = features;
1248
1249         return 0;
1250 }
1251
1252 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1253 {
1254         rbd_dev->mapping.size = 0;
1255         rbd_dev->mapping.features = 0;
1256 }
1257
1258 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1259 {
1260         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1261
1262         return offset & (segment_size - 1);
1263 }
1264
1265 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1266                                 u64 offset, u64 length)
1267 {
1268         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1269
1270         offset &= segment_size - 1;
1271
1272         rbd_assert(length <= U64_MAX - offset);
1273         if (offset + length > segment_size)
1274                 length = segment_size - offset;
1275
1276         return length;
1277 }
1278
1279 /*
1280  * bio helpers
1281  */
1282
1283 static void bio_chain_put(struct bio *chain)
1284 {
1285         struct bio *tmp;
1286
1287         while (chain) {
1288                 tmp = chain;
1289                 chain = chain->bi_next;
1290                 bio_put(tmp);
1291         }
1292 }
1293
1294 /*
1295  * zeros a bio chain, starting at specific offset
1296  */
1297 static void zero_bio_chain(struct bio *chain, int start_ofs)
1298 {
1299         struct bio_vec bv;
1300         struct bvec_iter iter;
1301         unsigned long flags;
1302         void *buf;
1303         int pos = 0;
1304
1305         while (chain) {
1306                 bio_for_each_segment(bv, chain, iter) {
1307                         if (pos + bv.bv_len > start_ofs) {
1308                                 int remainder = max(start_ofs - pos, 0);
1309                                 buf = bvec_kmap_irq(&bv, &flags);
1310                                 memset(buf + remainder, 0,
1311                                        bv.bv_len - remainder);
1312                                 flush_dcache_page(bv.bv_page);
1313                                 bvec_kunmap_irq(buf, &flags);
1314                         }
1315                         pos += bv.bv_len;
1316                 }
1317
1318                 chain = chain->bi_next;
1319         }
1320 }
1321
1322 /*
1323  * similar to zero_bio_chain(), zeros data defined by a page array,
1324  * starting at the given byte offset from the start of the array and
1325  * continuing up to the given end offset.  The pages array is
1326  * assumed to be big enough to hold all bytes up to the end.
1327  */
1328 static void zero_pages(struct page **pages, u64 offset, u64 end)
1329 {
1330         struct page **page = &pages[offset >> PAGE_SHIFT];
1331
1332         rbd_assert(end > offset);
1333         rbd_assert(end - offset <= (u64)SIZE_MAX);
1334         while (offset < end) {
1335                 size_t page_offset;
1336                 size_t length;
1337                 unsigned long flags;
1338                 void *kaddr;
1339
1340                 page_offset = offset & ~PAGE_MASK;
1341                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1342                 local_irq_save(flags);
1343                 kaddr = kmap_atomic(*page);
1344                 memset(kaddr + page_offset, 0, length);
1345                 flush_dcache_page(*page);
1346                 kunmap_atomic(kaddr);
1347                 local_irq_restore(flags);
1348
1349                 offset += length;
1350                 page++;
1351         }
1352 }
1353
1354 /*
1355  * Clone a portion of a bio, starting at the given byte offset
1356  * and continuing for the number of bytes indicated.
1357  */
1358 static struct bio *bio_clone_range(struct bio *bio_src,
1359                                         unsigned int offset,
1360                                         unsigned int len,
1361                                         gfp_t gfpmask)
1362 {
1363         struct bio *bio;
1364
1365         bio = bio_clone(bio_src, gfpmask);
1366         if (!bio)
1367                 return NULL;    /* ENOMEM */
1368
1369         bio_advance(bio, offset);
1370         bio->bi_iter.bi_size = len;
1371
1372         return bio;
1373 }
1374
1375 /*
1376  * Clone a portion of a bio chain, starting at the given byte offset
1377  * into the first bio in the source chain and continuing for the
1378  * number of bytes indicated.  The result is another bio chain of
1379  * exactly the given length, or a null pointer on error.
1380  *
1381  * The bio_src and offset parameters are both in-out.  On entry they
1382  * refer to the first source bio and the offset into that bio where
1383  * the start of data to be cloned is located.
1384  *
1385  * On return, bio_src is updated to refer to the bio in the source
1386  * chain that contains first un-cloned byte, and *offset will
1387  * contain the offset of that byte within that bio.
1388  */
1389 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1390                                         unsigned int *offset,
1391                                         unsigned int len,
1392                                         gfp_t gfpmask)
1393 {
1394         struct bio *bi = *bio_src;
1395         unsigned int off = *offset;
1396         struct bio *chain = NULL;
1397         struct bio **end;
1398
1399         /* Build up a chain of clone bios up to the limit */
1400
1401         if (!bi || off >= bi->bi_iter.bi_size || !len)
1402                 return NULL;            /* Nothing to clone */
1403
1404         end = &chain;
1405         while (len) {
1406                 unsigned int bi_size;
1407                 struct bio *bio;
1408
1409                 if (!bi) {
1410                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1411                         goto out_err;   /* EINVAL; ran out of bio's */
1412                 }
1413                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1414                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1415                 if (!bio)
1416                         goto out_err;   /* ENOMEM */
1417
1418                 *end = bio;
1419                 end = &bio->bi_next;
1420
1421                 off += bi_size;
1422                 if (off == bi->bi_iter.bi_size) {
1423                         bi = bi->bi_next;
1424                         off = 0;
1425                 }
1426                 len -= bi_size;
1427         }
1428         *bio_src = bi;
1429         *offset = off;
1430
1431         return chain;
1432 out_err:
1433         bio_chain_put(chain);
1434
1435         return NULL;
1436 }
1437
1438 /*
1439  * The default/initial value for all object request flags is 0.  For
1440  * each flag, once its value is set to 1 it is never reset to 0
1441  * again.
1442  */
1443 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1444 {
1445         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1446                 struct rbd_device *rbd_dev;
1447
1448                 rbd_dev = obj_request->img_request->rbd_dev;
1449                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1450                         obj_request);
1451         }
1452 }
1453
1454 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1455 {
1456         smp_mb();
1457         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1458 }
1459
1460 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1461 {
1462         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1463                 struct rbd_device *rbd_dev = NULL;
1464
1465                 if (obj_request_img_data_test(obj_request))
1466                         rbd_dev = obj_request->img_request->rbd_dev;
1467                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1468                         obj_request);
1469         }
1470 }
1471
1472 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1473 {
1474         smp_mb();
1475         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1476 }
1477
1478 /*
1479  * This sets the KNOWN flag after (possibly) setting the EXISTS
1480  * flag.  The latter is set based on the "exists" value provided.
1481  *
1482  * Note that for our purposes once an object exists it never goes
1483  * away again.  It's possible that the response from two existence
1484  * checks are separated by the creation of the target object, and
1485  * the first ("doesn't exist") response arrives *after* the second
1486  * ("does exist").  In that case we ignore the second one.
1487  */
1488 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1489                                 bool exists)
1490 {
1491         if (exists)
1492                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1493         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1494         smp_mb();
1495 }
1496
1497 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1498 {
1499         smp_mb();
1500         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1501 }
1502
1503 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1504 {
1505         smp_mb();
1506         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1507 }
1508
1509 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1510 {
1511         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1512
1513         return obj_request->img_offset <
1514             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1515 }
1516
1517 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1518 {
1519         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1520                 kref_read(&obj_request->kref));
1521         kref_get(&obj_request->kref);
1522 }
1523
1524 static void rbd_obj_request_destroy(struct kref *kref);
1525 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1526 {
1527         rbd_assert(obj_request != NULL);
1528         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1529                 kref_read(&obj_request->kref));
1530         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1531 }
1532
1533 static void rbd_img_request_get(struct rbd_img_request *img_request)
1534 {
1535         dout("%s: img %p (was %d)\n", __func__, img_request,
1536              kref_read(&img_request->kref));
1537         kref_get(&img_request->kref);
1538 }
1539
1540 static bool img_request_child_test(struct rbd_img_request *img_request);
1541 static void rbd_parent_request_destroy(struct kref *kref);
1542 static void rbd_img_request_destroy(struct kref *kref);
1543 static void rbd_img_request_put(struct rbd_img_request *img_request)
1544 {
1545         rbd_assert(img_request != NULL);
1546         dout("%s: img %p (was %d)\n", __func__, img_request,
1547                 kref_read(&img_request->kref));
1548         if (img_request_child_test(img_request))
1549                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1550         else
1551                 kref_put(&img_request->kref, rbd_img_request_destroy);
1552 }
1553
1554 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1555                                         struct rbd_obj_request *obj_request)
1556 {
1557         rbd_assert(obj_request->img_request == NULL);
1558
1559         /* Image request now owns object's original reference */
1560         obj_request->img_request = img_request;
1561         obj_request->which = img_request->obj_request_count;
1562         rbd_assert(!obj_request_img_data_test(obj_request));
1563         obj_request_img_data_set(obj_request);
1564         rbd_assert(obj_request->which != BAD_WHICH);
1565         img_request->obj_request_count++;
1566         list_add_tail(&obj_request->links, &img_request->obj_requests);
1567         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1568                 obj_request->which);
1569 }
1570
1571 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1572                                         struct rbd_obj_request *obj_request)
1573 {
1574         rbd_assert(obj_request->which != BAD_WHICH);
1575
1576         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1577                 obj_request->which);
1578         list_del(&obj_request->links);
1579         rbd_assert(img_request->obj_request_count > 0);
1580         img_request->obj_request_count--;
1581         rbd_assert(obj_request->which == img_request->obj_request_count);
1582         obj_request->which = BAD_WHICH;
1583         rbd_assert(obj_request_img_data_test(obj_request));
1584         rbd_assert(obj_request->img_request == img_request);
1585         obj_request->img_request = NULL;
1586         obj_request->callback = NULL;
1587         rbd_obj_request_put(obj_request);
1588 }
1589
1590 static bool obj_request_type_valid(enum obj_request_type type)
1591 {
1592         switch (type) {
1593         case OBJ_REQUEST_NODATA:
1594         case OBJ_REQUEST_BIO:
1595         case OBJ_REQUEST_PAGES:
1596                 return true;
1597         default:
1598                 return false;
1599         }
1600 }
1601
1602 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1603
1604 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1605 {
1606         struct ceph_osd_request *osd_req = obj_request->osd_req;
1607
1608         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1609              obj_request, obj_request->object_no, obj_request->offset,
1610              obj_request->length, osd_req);
1611         if (obj_request_img_data_test(obj_request)) {
1612                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1613                 rbd_img_request_get(obj_request->img_request);
1614         }
1615         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1616 }
1617
1618 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1619 {
1620
1621         dout("%s: img %p\n", __func__, img_request);
1622
1623         /*
1624          * If no error occurred, compute the aggregate transfer
1625          * count for the image request.  We could instead use
1626          * atomic64_cmpxchg() to update it as each object request
1627          * completes; not clear which way is better off hand.
1628          */
1629         if (!img_request->result) {
1630                 struct rbd_obj_request *obj_request;
1631                 u64 xferred = 0;
1632
1633                 for_each_obj_request(img_request, obj_request)
1634                         xferred += obj_request->xferred;
1635                 img_request->xferred = xferred;
1636         }
1637
1638         if (img_request->callback)
1639                 img_request->callback(img_request);
1640         else
1641                 rbd_img_request_put(img_request);
1642 }
1643
1644 /*
1645  * The default/initial value for all image request flags is 0.  Each
1646  * is conditionally set to 1 at image request initialization time
1647  * and currently never change thereafter.
1648  */
1649 static void img_request_write_set(struct rbd_img_request *img_request)
1650 {
1651         set_bit(IMG_REQ_WRITE, &img_request->flags);
1652         smp_mb();
1653 }
1654
1655 static bool img_request_write_test(struct rbd_img_request *img_request)
1656 {
1657         smp_mb();
1658         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1659 }
1660
1661 /*
1662  * Set the discard flag when the img_request is an discard request
1663  */
1664 static void img_request_discard_set(struct rbd_img_request *img_request)
1665 {
1666         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1667         smp_mb();
1668 }
1669
1670 static bool img_request_discard_test(struct rbd_img_request *img_request)
1671 {
1672         smp_mb();
1673         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1674 }
1675
1676 static void img_request_child_set(struct rbd_img_request *img_request)
1677 {
1678         set_bit(IMG_REQ_CHILD, &img_request->flags);
1679         smp_mb();
1680 }
1681
1682 static void img_request_child_clear(struct rbd_img_request *img_request)
1683 {
1684         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1685         smp_mb();
1686 }
1687
1688 static bool img_request_child_test(struct rbd_img_request *img_request)
1689 {
1690         smp_mb();
1691         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1692 }
1693
1694 static void img_request_layered_set(struct rbd_img_request *img_request)
1695 {
1696         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1697         smp_mb();
1698 }
1699
1700 static void img_request_layered_clear(struct rbd_img_request *img_request)
1701 {
1702         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1703         smp_mb();
1704 }
1705
1706 static bool img_request_layered_test(struct rbd_img_request *img_request)
1707 {
1708         smp_mb();
1709         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1710 }
1711
1712 static enum obj_operation_type
1713 rbd_img_request_op_type(struct rbd_img_request *img_request)
1714 {
1715         if (img_request_write_test(img_request))
1716                 return OBJ_OP_WRITE;
1717         else if (img_request_discard_test(img_request))
1718                 return OBJ_OP_DISCARD;
1719         else
1720                 return OBJ_OP_READ;
1721 }
1722
1723 static void
1724 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1725 {
1726         u64 xferred = obj_request->xferred;
1727         u64 length = obj_request->length;
1728
1729         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1730                 obj_request, obj_request->img_request, obj_request->result,
1731                 xferred, length);
1732         /*
1733          * ENOENT means a hole in the image.  We zero-fill the entire
1734          * length of the request.  A short read also implies zero-fill
1735          * to the end of the request.  An error requires the whole
1736          * length of the request to be reported finished with an error
1737          * to the block layer.  In each case we update the xferred
1738          * count to indicate the whole request was satisfied.
1739          */
1740         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1741         if (obj_request->result == -ENOENT) {
1742                 if (obj_request->type == OBJ_REQUEST_BIO)
1743                         zero_bio_chain(obj_request->bio_list, 0);
1744                 else
1745                         zero_pages(obj_request->pages, 0, length);
1746                 obj_request->result = 0;
1747         } else if (xferred < length && !obj_request->result) {
1748                 if (obj_request->type == OBJ_REQUEST_BIO)
1749                         zero_bio_chain(obj_request->bio_list, xferred);
1750                 else
1751                         zero_pages(obj_request->pages, xferred, length);
1752         }
1753         obj_request->xferred = length;
1754         obj_request_done_set(obj_request);
1755 }
1756
1757 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1758 {
1759         dout("%s: obj %p cb %p\n", __func__, obj_request,
1760                 obj_request->callback);
1761         if (obj_request->callback)
1762                 obj_request->callback(obj_request);
1763         else
1764                 complete_all(&obj_request->completion);
1765 }
1766
1767 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1768 {
1769         obj_request->result = err;
1770         obj_request->xferred = 0;
1771         /*
1772          * kludge - mirror rbd_obj_request_submit() to match a put in
1773          * rbd_img_obj_callback()
1774          */
1775         if (obj_request_img_data_test(obj_request)) {
1776                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1777                 rbd_img_request_get(obj_request->img_request);
1778         }
1779         obj_request_done_set(obj_request);
1780         rbd_obj_request_complete(obj_request);
1781 }
1782
1783 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1784 {
1785         struct rbd_img_request *img_request = NULL;
1786         struct rbd_device *rbd_dev = NULL;
1787         bool layered = false;
1788
1789         if (obj_request_img_data_test(obj_request)) {
1790                 img_request = obj_request->img_request;
1791                 layered = img_request && img_request_layered_test(img_request);
1792                 rbd_dev = img_request->rbd_dev;
1793         }
1794
1795         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1796                 obj_request, img_request, obj_request->result,
1797                 obj_request->xferred, obj_request->length);
1798         if (layered && obj_request->result == -ENOENT &&
1799                         obj_request->img_offset < rbd_dev->parent_overlap)
1800                 rbd_img_parent_read(obj_request);
1801         else if (img_request)
1802                 rbd_img_obj_request_read_callback(obj_request);
1803         else
1804                 obj_request_done_set(obj_request);
1805 }
1806
1807 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1808 {
1809         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1810                 obj_request->result, obj_request->length);
1811         /*
1812          * There is no such thing as a successful short write.  Set
1813          * it to our originally-requested length.
1814          */
1815         obj_request->xferred = obj_request->length;
1816         obj_request_done_set(obj_request);
1817 }
1818
1819 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1820 {
1821         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1822                 obj_request->result, obj_request->length);
1823         /*
1824          * There is no such thing as a successful short discard.  Set
1825          * it to our originally-requested length.
1826          */
1827         obj_request->xferred = obj_request->length;
1828         /* discarding a non-existent object is not a problem */
1829         if (obj_request->result == -ENOENT)
1830                 obj_request->result = 0;
1831         obj_request_done_set(obj_request);
1832 }
1833
1834 /*
1835  * For a simple stat call there's nothing to do.  We'll do more if
1836  * this is part of a write sequence for a layered image.
1837  */
1838 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1839 {
1840         dout("%s: obj %p\n", __func__, obj_request);
1841         obj_request_done_set(obj_request);
1842 }
1843
1844 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1845 {
1846         dout("%s: obj %p\n", __func__, obj_request);
1847
1848         if (obj_request_img_data_test(obj_request))
1849                 rbd_osd_copyup_callback(obj_request);
1850         else
1851                 obj_request_done_set(obj_request);
1852 }
1853
1854 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1855 {
1856         struct rbd_obj_request *obj_request = osd_req->r_priv;
1857         u16 opcode;
1858
1859         dout("%s: osd_req %p\n", __func__, osd_req);
1860         rbd_assert(osd_req == obj_request->osd_req);
1861         if (obj_request_img_data_test(obj_request)) {
1862                 rbd_assert(obj_request->img_request);
1863                 rbd_assert(obj_request->which != BAD_WHICH);
1864         } else {
1865                 rbd_assert(obj_request->which == BAD_WHICH);
1866         }
1867
1868         if (osd_req->r_result < 0)
1869                 obj_request->result = osd_req->r_result;
1870
1871         /*
1872          * We support a 64-bit length, but ultimately it has to be
1873          * passed to the block layer, which just supports a 32-bit
1874          * length field.
1875          */
1876         obj_request->xferred = osd_req->r_ops[0].outdata_len;
1877         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1878
1879         opcode = osd_req->r_ops[0].op;
1880         switch (opcode) {
1881         case CEPH_OSD_OP_READ:
1882                 rbd_osd_read_callback(obj_request);
1883                 break;
1884         case CEPH_OSD_OP_SETALLOCHINT:
1885                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1886                            osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1887                 /* fall through */
1888         case CEPH_OSD_OP_WRITE:
1889         case CEPH_OSD_OP_WRITEFULL:
1890                 rbd_osd_write_callback(obj_request);
1891                 break;
1892         case CEPH_OSD_OP_STAT:
1893                 rbd_osd_stat_callback(obj_request);
1894                 break;
1895         case CEPH_OSD_OP_DELETE:
1896         case CEPH_OSD_OP_TRUNCATE:
1897         case CEPH_OSD_OP_ZERO:
1898                 rbd_osd_discard_callback(obj_request);
1899                 break;
1900         case CEPH_OSD_OP_CALL:
1901                 rbd_osd_call_callback(obj_request);
1902                 break;
1903         default:
1904                 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1905                          obj_request->object_no, opcode);
1906                 break;
1907         }
1908
1909         if (obj_request_done_test(obj_request))
1910                 rbd_obj_request_complete(obj_request);
1911 }
1912
1913 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1914 {
1915         struct ceph_osd_request *osd_req = obj_request->osd_req;
1916
1917         rbd_assert(obj_request_img_data_test(obj_request));
1918         osd_req->r_snapid = obj_request->img_request->snap_id;
1919 }
1920
1921 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1922 {
1923         struct ceph_osd_request *osd_req = obj_request->osd_req;
1924
1925         ktime_get_real_ts(&osd_req->r_mtime);
1926         osd_req->r_data_offset = obj_request->offset;
1927 }
1928
1929 static struct ceph_osd_request *
1930 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1931                      struct ceph_snap_context *snapc,
1932                      int num_ops, unsigned int flags,
1933                      struct rbd_obj_request *obj_request)
1934 {
1935         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1936         struct ceph_osd_request *req;
1937         const char *name_format = rbd_dev->image_format == 1 ?
1938                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1939
1940         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1941         if (!req)
1942                 return NULL;
1943
1944         req->r_flags = flags;
1945         req->r_callback = rbd_osd_req_callback;
1946         req->r_priv = obj_request;
1947
1948         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1949         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1950                         rbd_dev->header.object_prefix, obj_request->object_no))
1951                 goto err_req;
1952
1953         if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1954                 goto err_req;
1955
1956         return req;
1957
1958 err_req:
1959         ceph_osdc_put_request(req);
1960         return NULL;
1961 }
1962
1963 /*
1964  * Create an osd request.  A read request has one osd op (read).
1965  * A write request has either one (watch) or two (hint+write) osd ops.
1966  * (All rbd data writes are prefixed with an allocation hint op, but
1967  * technically osd watch is a write request, hence this distinction.)
1968  */
1969 static struct ceph_osd_request *rbd_osd_req_create(
1970                                         struct rbd_device *rbd_dev,
1971                                         enum obj_operation_type op_type,
1972                                         unsigned int num_ops,
1973                                         struct rbd_obj_request *obj_request)
1974 {
1975         struct ceph_snap_context *snapc = NULL;
1976
1977         if (obj_request_img_data_test(obj_request) &&
1978                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1979                 struct rbd_img_request *img_request = obj_request->img_request;
1980                 if (op_type == OBJ_OP_WRITE) {
1981                         rbd_assert(img_request_write_test(img_request));
1982                 } else {
1983                         rbd_assert(img_request_discard_test(img_request));
1984                 }
1985                 snapc = img_request->snapc;
1986         }
1987
1988         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1989
1990         return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1991             (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1992             CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1993 }
1994
1995 /*
1996  * Create a copyup osd request based on the information in the object
1997  * request supplied.  A copyup request has two or three osd ops, a
1998  * copyup method call, potentially a hint op, and a write or truncate
1999  * or zero op.
2000  */
2001 static struct ceph_osd_request *
2002 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2003 {
2004         struct rbd_img_request *img_request;
2005         int num_osd_ops = 3;
2006
2007         rbd_assert(obj_request_img_data_test(obj_request));
2008         img_request = obj_request->img_request;
2009         rbd_assert(img_request);
2010         rbd_assert(img_request_write_test(img_request) ||
2011                         img_request_discard_test(img_request));
2012
2013         if (img_request_discard_test(img_request))
2014                 num_osd_ops = 2;
2015
2016         return __rbd_osd_req_create(img_request->rbd_dev,
2017                                     img_request->snapc, num_osd_ops,
2018                                     CEPH_OSD_FLAG_WRITE, obj_request);
2019 }
2020
2021 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2022 {
2023         ceph_osdc_put_request(osd_req);
2024 }
2025
2026 static struct rbd_obj_request *
2027 rbd_obj_request_create(enum obj_request_type type)
2028 {
2029         struct rbd_obj_request *obj_request;
2030
2031         rbd_assert(obj_request_type_valid(type));
2032
2033         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2034         if (!obj_request)
2035                 return NULL;
2036
2037         obj_request->which = BAD_WHICH;
2038         obj_request->type = type;
2039         INIT_LIST_HEAD(&obj_request->links);
2040         init_completion(&obj_request->completion);
2041         kref_init(&obj_request->kref);
2042
2043         dout("%s %p\n", __func__, obj_request);
2044         return obj_request;
2045 }
2046
2047 static void rbd_obj_request_destroy(struct kref *kref)
2048 {
2049         struct rbd_obj_request *obj_request;
2050
2051         obj_request = container_of(kref, struct rbd_obj_request, kref);
2052
2053         dout("%s: obj %p\n", __func__, obj_request);
2054
2055         rbd_assert(obj_request->img_request == NULL);
2056         rbd_assert(obj_request->which == BAD_WHICH);
2057
2058         if (obj_request->osd_req)
2059                 rbd_osd_req_destroy(obj_request->osd_req);
2060
2061         rbd_assert(obj_request_type_valid(obj_request->type));
2062         switch (obj_request->type) {
2063         case OBJ_REQUEST_NODATA:
2064                 break;          /* Nothing to do */
2065         case OBJ_REQUEST_BIO:
2066                 if (obj_request->bio_list)
2067                         bio_chain_put(obj_request->bio_list);
2068                 break;
2069         case OBJ_REQUEST_PAGES:
2070                 /* img_data requests don't own their page array */
2071                 if (obj_request->pages &&
2072                     !obj_request_img_data_test(obj_request))
2073                         ceph_release_page_vector(obj_request->pages,
2074                                                 obj_request->page_count);
2075                 break;
2076         }
2077
2078         kmem_cache_free(rbd_obj_request_cache, obj_request);
2079 }
2080
2081 /* It's OK to call this for a device with no parent */
2082
2083 static void rbd_spec_put(struct rbd_spec *spec);
2084 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2085 {
2086         rbd_dev_remove_parent(rbd_dev);
2087         rbd_spec_put(rbd_dev->parent_spec);
2088         rbd_dev->parent_spec = NULL;
2089         rbd_dev->parent_overlap = 0;
2090 }
2091
2092 /*
2093  * Parent image reference counting is used to determine when an
2094  * image's parent fields can be safely torn down--after there are no
2095  * more in-flight requests to the parent image.  When the last
2096  * reference is dropped, cleaning them up is safe.
2097  */
2098 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2099 {
2100         int counter;
2101
2102         if (!rbd_dev->parent_spec)
2103                 return;
2104
2105         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2106         if (counter > 0)
2107                 return;
2108
2109         /* Last reference; clean up parent data structures */
2110
2111         if (!counter)
2112                 rbd_dev_unparent(rbd_dev);
2113         else
2114                 rbd_warn(rbd_dev, "parent reference underflow");
2115 }
2116
2117 /*
2118  * If an image has a non-zero parent overlap, get a reference to its
2119  * parent.
2120  *
2121  * Returns true if the rbd device has a parent with a non-zero
2122  * overlap and a reference for it was successfully taken, or
2123  * false otherwise.
2124  */
2125 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2126 {
2127         int counter = 0;
2128
2129         if (!rbd_dev->parent_spec)
2130                 return false;
2131
2132         down_read(&rbd_dev->header_rwsem);
2133         if (rbd_dev->parent_overlap)
2134                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2135         up_read(&rbd_dev->header_rwsem);
2136
2137         if (counter < 0)
2138                 rbd_warn(rbd_dev, "parent reference overflow");
2139
2140         return counter > 0;
2141 }
2142
2143 /*
2144  * Caller is responsible for filling in the list of object requests
2145  * that comprises the image request, and the Linux request pointer
2146  * (if there is one).
2147  */
2148 static struct rbd_img_request *rbd_img_request_create(
2149                                         struct rbd_device *rbd_dev,
2150                                         u64 offset, u64 length,
2151                                         enum obj_operation_type op_type,
2152                                         struct ceph_snap_context *snapc)
2153 {
2154         struct rbd_img_request *img_request;
2155
2156         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2157         if (!img_request)
2158                 return NULL;
2159
2160         img_request->rq = NULL;
2161         img_request->rbd_dev = rbd_dev;
2162         img_request->offset = offset;
2163         img_request->length = length;
2164         img_request->flags = 0;
2165         if (op_type == OBJ_OP_DISCARD) {
2166                 img_request_discard_set(img_request);
2167                 img_request->snapc = snapc;
2168         } else if (op_type == OBJ_OP_WRITE) {
2169                 img_request_write_set(img_request);
2170                 img_request->snapc = snapc;
2171         } else {
2172                 img_request->snap_id = rbd_dev->spec->snap_id;
2173         }
2174         if (rbd_dev_parent_get(rbd_dev))
2175                 img_request_layered_set(img_request);
2176         spin_lock_init(&img_request->completion_lock);
2177         img_request->next_completion = 0;
2178         img_request->callback = NULL;
2179         img_request->result = 0;
2180         img_request->obj_request_count = 0;
2181         INIT_LIST_HEAD(&img_request->obj_requests);
2182         kref_init(&img_request->kref);
2183
2184         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2185                 obj_op_name(op_type), offset, length, img_request);
2186
2187         return img_request;
2188 }
2189
2190 static void rbd_img_request_destroy(struct kref *kref)
2191 {
2192         struct rbd_img_request *img_request;
2193         struct rbd_obj_request *obj_request;
2194         struct rbd_obj_request *next_obj_request;
2195
2196         img_request = container_of(kref, struct rbd_img_request, kref);
2197
2198         dout("%s: img %p\n", __func__, img_request);
2199
2200         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2201                 rbd_img_obj_request_del(img_request, obj_request);
2202         rbd_assert(img_request->obj_request_count == 0);
2203
2204         if (img_request_layered_test(img_request)) {
2205                 img_request_layered_clear(img_request);
2206                 rbd_dev_parent_put(img_request->rbd_dev);
2207         }
2208
2209         if (img_request_write_test(img_request) ||
2210                 img_request_discard_test(img_request))
2211                 ceph_put_snap_context(img_request->snapc);
2212
2213         kmem_cache_free(rbd_img_request_cache, img_request);
2214 }
2215
2216 static struct rbd_img_request *rbd_parent_request_create(
2217                                         struct rbd_obj_request *obj_request,
2218                                         u64 img_offset, u64 length)
2219 {
2220         struct rbd_img_request *parent_request;
2221         struct rbd_device *rbd_dev;
2222
2223         rbd_assert(obj_request->img_request);
2224         rbd_dev = obj_request->img_request->rbd_dev;
2225
2226         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2227                                                 length, OBJ_OP_READ, NULL);
2228         if (!parent_request)
2229                 return NULL;
2230
2231         img_request_child_set(parent_request);
2232         rbd_obj_request_get(obj_request);
2233         parent_request->obj_request = obj_request;
2234
2235         return parent_request;
2236 }
2237
2238 static void rbd_parent_request_destroy(struct kref *kref)
2239 {
2240         struct rbd_img_request *parent_request;
2241         struct rbd_obj_request *orig_request;
2242
2243         parent_request = container_of(kref, struct rbd_img_request, kref);
2244         orig_request = parent_request->obj_request;
2245
2246         parent_request->obj_request = NULL;
2247         rbd_obj_request_put(orig_request);
2248         img_request_child_clear(parent_request);
2249
2250         rbd_img_request_destroy(kref);
2251 }
2252
2253 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2254 {
2255         struct rbd_img_request *img_request;
2256         unsigned int xferred;
2257         int result;
2258         bool more;
2259
2260         rbd_assert(obj_request_img_data_test(obj_request));
2261         img_request = obj_request->img_request;
2262
2263         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2264         xferred = (unsigned int)obj_request->xferred;
2265         result = obj_request->result;
2266         if (result) {
2267                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2268                 enum obj_operation_type op_type;
2269
2270                 if (img_request_discard_test(img_request))
2271                         op_type = OBJ_OP_DISCARD;
2272                 else if (img_request_write_test(img_request))
2273                         op_type = OBJ_OP_WRITE;
2274                 else
2275                         op_type = OBJ_OP_READ;
2276
2277                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2278                         obj_op_name(op_type), obj_request->length,
2279                         obj_request->img_offset, obj_request->offset);
2280                 rbd_warn(rbd_dev, "  result %d xferred %x",
2281                         result, xferred);
2282                 if (!img_request->result)
2283                         img_request->result = result;
2284                 /*
2285                  * Need to end I/O on the entire obj_request worth of
2286                  * bytes in case of error.
2287                  */
2288                 xferred = obj_request->length;
2289         }
2290
2291         if (img_request_child_test(img_request)) {
2292                 rbd_assert(img_request->obj_request != NULL);
2293                 more = obj_request->which < img_request->obj_request_count - 1;
2294         } else {
2295                 rbd_assert(img_request->rq != NULL);
2296
2297                 more = blk_update_request(img_request->rq, result, xferred);
2298                 if (!more)
2299                         __blk_mq_end_request(img_request->rq, result);
2300         }
2301
2302         return more;
2303 }
2304
2305 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2306 {
2307         struct rbd_img_request *img_request;
2308         u32 which = obj_request->which;
2309         bool more = true;
2310
2311         rbd_assert(obj_request_img_data_test(obj_request));
2312         img_request = obj_request->img_request;
2313
2314         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2315         rbd_assert(img_request != NULL);
2316         rbd_assert(img_request->obj_request_count > 0);
2317         rbd_assert(which != BAD_WHICH);
2318         rbd_assert(which < img_request->obj_request_count);
2319
2320         spin_lock_irq(&img_request->completion_lock);
2321         if (which != img_request->next_completion)
2322                 goto out;
2323
2324         for_each_obj_request_from(img_request, obj_request) {
2325                 rbd_assert(more);
2326                 rbd_assert(which < img_request->obj_request_count);
2327
2328                 if (!obj_request_done_test(obj_request))
2329                         break;
2330                 more = rbd_img_obj_end_request(obj_request);
2331                 which++;
2332         }
2333
2334         rbd_assert(more ^ (which == img_request->obj_request_count));
2335         img_request->next_completion = which;
2336 out:
2337         spin_unlock_irq(&img_request->completion_lock);
2338         rbd_img_request_put(img_request);
2339
2340         if (!more)
2341                 rbd_img_request_complete(img_request);
2342 }
2343
2344 /*
2345  * Add individual osd ops to the given ceph_osd_request and prepare
2346  * them for submission. num_ops is the current number of
2347  * osd operations already to the object request.
2348  */
2349 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2350                                 struct ceph_osd_request *osd_request,
2351                                 enum obj_operation_type op_type,
2352                                 unsigned int num_ops)
2353 {
2354         struct rbd_img_request *img_request = obj_request->img_request;
2355         struct rbd_device *rbd_dev = img_request->rbd_dev;
2356         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2357         u64 offset = obj_request->offset;
2358         u64 length = obj_request->length;
2359         u64 img_end;
2360         u16 opcode;
2361
2362         if (op_type == OBJ_OP_DISCARD) {
2363                 if (!offset && length == object_size &&
2364                     (!img_request_layered_test(img_request) ||
2365                      !obj_request_overlaps_parent(obj_request))) {
2366                         opcode = CEPH_OSD_OP_DELETE;
2367                 } else if ((offset + length == object_size)) {
2368                         opcode = CEPH_OSD_OP_TRUNCATE;
2369                 } else {
2370                         down_read(&rbd_dev->header_rwsem);
2371                         img_end = rbd_dev->header.image_size;
2372                         up_read(&rbd_dev->header_rwsem);
2373
2374                         if (obj_request->img_offset + length == img_end)
2375                                 opcode = CEPH_OSD_OP_TRUNCATE;
2376                         else
2377                                 opcode = CEPH_OSD_OP_ZERO;
2378                 }
2379         } else if (op_type == OBJ_OP_WRITE) {
2380                 if (!offset && length == object_size)
2381                         opcode = CEPH_OSD_OP_WRITEFULL;
2382                 else
2383                         opcode = CEPH_OSD_OP_WRITE;
2384                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2385                                         object_size, object_size);
2386                 num_ops++;
2387         } else {
2388                 opcode = CEPH_OSD_OP_READ;
2389         }
2390
2391         if (opcode == CEPH_OSD_OP_DELETE)
2392                 osd_req_op_init(osd_request, num_ops, opcode, 0);
2393         else
2394                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2395                                        offset, length, 0, 0);
2396
2397         if (obj_request->type == OBJ_REQUEST_BIO)
2398                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2399                                         obj_request->bio_list, length);
2400         else if (obj_request->type == OBJ_REQUEST_PAGES)
2401                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2402                                         obj_request->pages, length,
2403                                         offset & ~PAGE_MASK, false, false);
2404
2405         /* Discards are also writes */
2406         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2407                 rbd_osd_req_format_write(obj_request);
2408         else
2409                 rbd_osd_req_format_read(obj_request);
2410 }
2411
2412 /*
2413  * Split up an image request into one or more object requests, each
2414  * to a different object.  The "type" parameter indicates whether
2415  * "data_desc" is the pointer to the head of a list of bio
2416  * structures, or the base of a page array.  In either case this
2417  * function assumes data_desc describes memory sufficient to hold
2418  * all data described by the image request.
2419  */
2420 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2421                                         enum obj_request_type type,
2422                                         void *data_desc)
2423 {
2424         struct rbd_device *rbd_dev = img_request->rbd_dev;
2425         struct rbd_obj_request *obj_request = NULL;
2426         struct rbd_obj_request *next_obj_request;
2427         struct bio *bio_list = NULL;
2428         unsigned int bio_offset = 0;
2429         struct page **pages = NULL;
2430         enum obj_operation_type op_type;
2431         u64 img_offset;
2432         u64 resid;
2433
2434         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2435                 (int)type, data_desc);
2436
2437         img_offset = img_request->offset;
2438         resid = img_request->length;
2439         rbd_assert(resid > 0);
2440         op_type = rbd_img_request_op_type(img_request);
2441
2442         if (type == OBJ_REQUEST_BIO) {
2443                 bio_list = data_desc;
2444                 rbd_assert(img_offset ==
2445                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2446         } else if (type == OBJ_REQUEST_PAGES) {
2447                 pages = data_desc;
2448         }
2449
2450         while (resid) {
2451                 struct ceph_osd_request *osd_req;
2452                 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2453                 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2454                 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2455
2456                 obj_request = rbd_obj_request_create(type);
2457                 if (!obj_request)
2458                         goto out_unwind;
2459
2460                 obj_request->object_no = object_no;
2461                 obj_request->offset = offset;
2462                 obj_request->length = length;
2463
2464                 /*
2465                  * set obj_request->img_request before creating the
2466                  * osd_request so that it gets the right snapc
2467                  */
2468                 rbd_img_obj_request_add(img_request, obj_request);
2469
2470                 if (type == OBJ_REQUEST_BIO) {
2471                         unsigned int clone_size;
2472
2473                         rbd_assert(length <= (u64)UINT_MAX);
2474                         clone_size = (unsigned int)length;
2475                         obj_request->bio_list =
2476                                         bio_chain_clone_range(&bio_list,
2477                                                                 &bio_offset,
2478                                                                 clone_size,
2479                                                                 GFP_NOIO);
2480                         if (!obj_request->bio_list)
2481                                 goto out_unwind;
2482                 } else if (type == OBJ_REQUEST_PAGES) {
2483                         unsigned int page_count;
2484
2485                         obj_request->pages = pages;
2486                         page_count = (u32)calc_pages_for(offset, length);
2487                         obj_request->page_count = page_count;
2488                         if ((offset + length) & ~PAGE_MASK)
2489                                 page_count--;   /* more on last page */
2490                         pages += page_count;
2491                 }
2492
2493                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2494                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2495                                         obj_request);
2496                 if (!osd_req)
2497                         goto out_unwind;
2498
2499                 obj_request->osd_req = osd_req;
2500                 obj_request->callback = rbd_img_obj_callback;
2501                 obj_request->img_offset = img_offset;
2502
2503                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2504
2505                 img_offset += length;
2506                 resid -= length;
2507         }
2508
2509         return 0;
2510
2511 out_unwind:
2512         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2513                 rbd_img_obj_request_del(img_request, obj_request);
2514
2515         return -ENOMEM;
2516 }
2517
2518 static void
2519 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2520 {
2521         struct rbd_img_request *img_request;
2522         struct rbd_device *rbd_dev;
2523         struct page **pages;
2524         u32 page_count;
2525
2526         dout("%s: obj %p\n", __func__, obj_request);
2527
2528         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2529                 obj_request->type == OBJ_REQUEST_NODATA);
2530         rbd_assert(obj_request_img_data_test(obj_request));
2531         img_request = obj_request->img_request;
2532         rbd_assert(img_request);
2533
2534         rbd_dev = img_request->rbd_dev;
2535         rbd_assert(rbd_dev);
2536
2537         pages = obj_request->copyup_pages;
2538         rbd_assert(pages != NULL);
2539         obj_request->copyup_pages = NULL;
2540         page_count = obj_request->copyup_page_count;
2541         rbd_assert(page_count);
2542         obj_request->copyup_page_count = 0;
2543         ceph_release_page_vector(pages, page_count);
2544
2545         /*
2546          * We want the transfer count to reflect the size of the
2547          * original write request.  There is no such thing as a
2548          * successful short write, so if the request was successful
2549          * we can just set it to the originally-requested length.
2550          */
2551         if (!obj_request->result)
2552                 obj_request->xferred = obj_request->length;
2553
2554         obj_request_done_set(obj_request);
2555 }
2556
2557 static void
2558 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2559 {
2560         struct rbd_obj_request *orig_request;
2561         struct ceph_osd_request *osd_req;
2562         struct rbd_device *rbd_dev;
2563         struct page **pages;
2564         enum obj_operation_type op_type;
2565         u32 page_count;
2566         int img_result;
2567         u64 parent_length;
2568
2569         rbd_assert(img_request_child_test(img_request));
2570
2571         /* First get what we need from the image request */
2572
2573         pages = img_request->copyup_pages;
2574         rbd_assert(pages != NULL);
2575         img_request->copyup_pages = NULL;
2576         page_count = img_request->copyup_page_count;
2577         rbd_assert(page_count);
2578         img_request->copyup_page_count = 0;
2579
2580         orig_request = img_request->obj_request;
2581         rbd_assert(orig_request != NULL);
2582         rbd_assert(obj_request_type_valid(orig_request->type));
2583         img_result = img_request->result;
2584         parent_length = img_request->length;
2585         rbd_assert(img_result || parent_length == img_request->xferred);
2586         rbd_img_request_put(img_request);
2587
2588         rbd_assert(orig_request->img_request);
2589         rbd_dev = orig_request->img_request->rbd_dev;
2590         rbd_assert(rbd_dev);
2591
2592         /*
2593          * If the overlap has become 0 (most likely because the
2594          * image has been flattened) we need to free the pages
2595          * and re-submit the original write request.
2596          */
2597         if (!rbd_dev->parent_overlap) {
2598                 ceph_release_page_vector(pages, page_count);
2599                 rbd_obj_request_submit(orig_request);
2600                 return;
2601         }
2602
2603         if (img_result)
2604                 goto out_err;
2605
2606         /*
2607          * The original osd request is of no use to use any more.
2608          * We need a new one that can hold the three ops in a copyup
2609          * request.  Allocate the new copyup osd request for the
2610          * original request, and release the old one.
2611          */
2612         img_result = -ENOMEM;
2613         osd_req = rbd_osd_req_create_copyup(orig_request);
2614         if (!osd_req)
2615                 goto out_err;
2616         rbd_osd_req_destroy(orig_request->osd_req);
2617         orig_request->osd_req = osd_req;
2618         orig_request->copyup_pages = pages;
2619         orig_request->copyup_page_count = page_count;
2620
2621         /* Initialize the copyup op */
2622
2623         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2624         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2625                                                 false, false);
2626
2627         /* Add the other op(s) */
2628
2629         op_type = rbd_img_request_op_type(orig_request->img_request);
2630         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2631
2632         /* All set, send it off. */
2633
2634         rbd_obj_request_submit(orig_request);
2635         return;
2636
2637 out_err:
2638         ceph_release_page_vector(pages, page_count);
2639         rbd_obj_request_error(orig_request, img_result);
2640 }
2641
2642 /*
2643  * Read from the parent image the range of data that covers the
2644  * entire target of the given object request.  This is used for
2645  * satisfying a layered image write request when the target of an
2646  * object request from the image request does not exist.
2647  *
2648  * A page array big enough to hold the returned data is allocated
2649  * and supplied to rbd_img_request_fill() as the "data descriptor."
2650  * When the read completes, this page array will be transferred to
2651  * the original object request for the copyup operation.
2652  *
2653  * If an error occurs, it is recorded as the result of the original
2654  * object request in rbd_img_obj_exists_callback().
2655  */
2656 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2657 {
2658         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2659         struct rbd_img_request *parent_request = NULL;
2660         u64 img_offset;
2661         u64 length;
2662         struct page **pages = NULL;
2663         u32 page_count;
2664         int result;
2665
2666         rbd_assert(rbd_dev->parent != NULL);
2667
2668         /*
2669          * Determine the byte range covered by the object in the
2670          * child image to which the original request was to be sent.
2671          */
2672         img_offset = obj_request->img_offset - obj_request->offset;
2673         length = rbd_obj_bytes(&rbd_dev->header);
2674
2675         /*
2676          * There is no defined parent data beyond the parent
2677          * overlap, so limit what we read at that boundary if
2678          * necessary.
2679          */
2680         if (img_offset + length > rbd_dev->parent_overlap) {
2681                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2682                 length = rbd_dev->parent_overlap - img_offset;
2683         }
2684
2685         /*
2686          * Allocate a page array big enough to receive the data read
2687          * from the parent.
2688          */
2689         page_count = (u32)calc_pages_for(0, length);
2690         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2691         if (IS_ERR(pages)) {
2692                 result = PTR_ERR(pages);
2693                 pages = NULL;
2694                 goto out_err;
2695         }
2696
2697         result = -ENOMEM;
2698         parent_request = rbd_parent_request_create(obj_request,
2699                                                 img_offset, length);
2700         if (!parent_request)
2701                 goto out_err;
2702
2703         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2704         if (result)
2705                 goto out_err;
2706
2707         parent_request->copyup_pages = pages;
2708         parent_request->copyup_page_count = page_count;
2709         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2710
2711         result = rbd_img_request_submit(parent_request);
2712         if (!result)
2713                 return 0;
2714
2715         parent_request->copyup_pages = NULL;
2716         parent_request->copyup_page_count = 0;
2717         parent_request->obj_request = NULL;
2718         rbd_obj_request_put(obj_request);
2719 out_err:
2720         if (pages)
2721                 ceph_release_page_vector(pages, page_count);
2722         if (parent_request)
2723                 rbd_img_request_put(parent_request);
2724         return result;
2725 }
2726
2727 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2728 {
2729         struct rbd_obj_request *orig_request;
2730         struct rbd_device *rbd_dev;
2731         int result;
2732
2733         rbd_assert(!obj_request_img_data_test(obj_request));
2734
2735         /*
2736          * All we need from the object request is the original
2737          * request and the result of the STAT op.  Grab those, then
2738          * we're done with the request.
2739          */
2740         orig_request = obj_request->obj_request;
2741         obj_request->obj_request = NULL;
2742         rbd_obj_request_put(orig_request);
2743         rbd_assert(orig_request);
2744         rbd_assert(orig_request->img_request);
2745
2746         result = obj_request->result;
2747         obj_request->result = 0;
2748
2749         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2750                 obj_request, orig_request, result,
2751                 obj_request->xferred, obj_request->length);
2752         rbd_obj_request_put(obj_request);
2753
2754         /*
2755          * If the overlap has become 0 (most likely because the
2756          * image has been flattened) we need to re-submit the
2757          * original request.
2758          */
2759         rbd_dev = orig_request->img_request->rbd_dev;
2760         if (!rbd_dev->parent_overlap) {
2761                 rbd_obj_request_submit(orig_request);
2762                 return;
2763         }
2764
2765         /*
2766          * Our only purpose here is to determine whether the object
2767          * exists, and we don't want to treat the non-existence as
2768          * an error.  If something else comes back, transfer the
2769          * error to the original request and complete it now.
2770          */
2771         if (!result) {
2772                 obj_request_existence_set(orig_request, true);
2773         } else if (result == -ENOENT) {
2774                 obj_request_existence_set(orig_request, false);
2775         } else {
2776                 goto fail_orig_request;
2777         }
2778
2779         /*
2780          * Resubmit the original request now that we have recorded
2781          * whether the target object exists.
2782          */
2783         result = rbd_img_obj_request_submit(orig_request);
2784         if (result)
2785                 goto fail_orig_request;
2786
2787         return;
2788
2789 fail_orig_request:
2790         rbd_obj_request_error(orig_request, result);
2791 }
2792
2793 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2794 {
2795         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2796         struct rbd_obj_request *stat_request;
2797         struct page **pages;
2798         u32 page_count;
2799         size_t size;
2800         int ret;
2801
2802         stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2803         if (!stat_request)
2804                 return -ENOMEM;
2805
2806         stat_request->object_no = obj_request->object_no;
2807
2808         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2809                                                    stat_request);
2810         if (!stat_request->osd_req) {
2811                 ret = -ENOMEM;
2812                 goto fail_stat_request;
2813         }
2814
2815         /*
2816          * The response data for a STAT call consists of:
2817          *     le64 length;
2818          *     struct {
2819          *         le32 tv_sec;
2820          *         le32 tv_nsec;
2821          *     } mtime;
2822          */
2823         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2824         page_count = (u32)calc_pages_for(0, size);
2825         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2826         if (IS_ERR(pages)) {
2827                 ret = PTR_ERR(pages);
2828                 goto fail_stat_request;
2829         }
2830
2831         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2832         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2833                                      false, false);
2834
2835         rbd_obj_request_get(obj_request);
2836         stat_request->obj_request = obj_request;
2837         stat_request->pages = pages;
2838         stat_request->page_count = page_count;
2839         stat_request->callback = rbd_img_obj_exists_callback;
2840
2841         rbd_obj_request_submit(stat_request);
2842         return 0;
2843
2844 fail_stat_request:
2845         rbd_obj_request_put(stat_request);
2846         return ret;
2847 }
2848
2849 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2850 {
2851         struct rbd_img_request *img_request = obj_request->img_request;
2852         struct rbd_device *rbd_dev = img_request->rbd_dev;
2853
2854         /* Reads */
2855         if (!img_request_write_test(img_request) &&
2856             !img_request_discard_test(img_request))
2857                 return true;
2858
2859         /* Non-layered writes */
2860         if (!img_request_layered_test(img_request))
2861                 return true;
2862
2863         /*
2864          * Layered writes outside of the parent overlap range don't
2865          * share any data with the parent.
2866          */
2867         if (!obj_request_overlaps_parent(obj_request))
2868                 return true;
2869
2870         /*
2871          * Entire-object layered writes - we will overwrite whatever
2872          * parent data there is anyway.
2873          */
2874         if (!obj_request->offset &&
2875             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2876                 return true;
2877
2878         /*
2879          * If the object is known to already exist, its parent data has
2880          * already been copied.
2881          */
2882         if (obj_request_known_test(obj_request) &&
2883             obj_request_exists_test(obj_request))
2884                 return true;
2885
2886         return false;
2887 }
2888
2889 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2890 {
2891         rbd_assert(obj_request_img_data_test(obj_request));
2892         rbd_assert(obj_request_type_valid(obj_request->type));
2893         rbd_assert(obj_request->img_request);
2894
2895         if (img_obj_request_simple(obj_request)) {
2896                 rbd_obj_request_submit(obj_request);
2897                 return 0;
2898         }
2899
2900         /*
2901          * It's a layered write.  The target object might exist but
2902          * we may not know that yet.  If we know it doesn't exist,
2903          * start by reading the data for the full target object from
2904          * the parent so we can use it for a copyup to the target.
2905          */
2906         if (obj_request_known_test(obj_request))
2907                 return rbd_img_obj_parent_read_full(obj_request);
2908
2909         /* We don't know whether the target exists.  Go find out. */
2910
2911         return rbd_img_obj_exists_submit(obj_request);
2912 }
2913
2914 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2915 {
2916         struct rbd_obj_request *obj_request;
2917         struct rbd_obj_request *next_obj_request;
2918         int ret = 0;
2919
2920         dout("%s: img %p\n", __func__, img_request);
2921
2922         rbd_img_request_get(img_request);
2923         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2924                 ret = rbd_img_obj_request_submit(obj_request);
2925                 if (ret)
2926                         goto out_put_ireq;
2927         }
2928
2929 out_put_ireq:
2930         rbd_img_request_put(img_request);
2931         return ret;
2932 }
2933
2934 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2935 {
2936         struct rbd_obj_request *obj_request;
2937         struct rbd_device *rbd_dev;
2938         u64 obj_end;
2939         u64 img_xferred;
2940         int img_result;
2941
2942         rbd_assert(img_request_child_test(img_request));
2943
2944         /* First get what we need from the image request and release it */
2945
2946         obj_request = img_request->obj_request;
2947         img_xferred = img_request->xferred;
2948         img_result = img_request->result;
2949         rbd_img_request_put(img_request);
2950
2951         /*
2952          * If the overlap has become 0 (most likely because the
2953          * image has been flattened) we need to re-submit the
2954          * original request.
2955          */
2956         rbd_assert(obj_request);
2957         rbd_assert(obj_request->img_request);
2958         rbd_dev = obj_request->img_request->rbd_dev;
2959         if (!rbd_dev->parent_overlap) {
2960                 rbd_obj_request_submit(obj_request);
2961                 return;
2962         }
2963
2964         obj_request->result = img_result;
2965         if (obj_request->result)
2966                 goto out;
2967
2968         /*
2969          * We need to zero anything beyond the parent overlap
2970          * boundary.  Since rbd_img_obj_request_read_callback()
2971          * will zero anything beyond the end of a short read, an
2972          * easy way to do this is to pretend the data from the
2973          * parent came up short--ending at the overlap boundary.
2974          */
2975         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2976         obj_end = obj_request->img_offset + obj_request->length;
2977         if (obj_end > rbd_dev->parent_overlap) {
2978                 u64 xferred = 0;
2979
2980                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2981                         xferred = rbd_dev->parent_overlap -
2982                                         obj_request->img_offset;
2983
2984                 obj_request->xferred = min(img_xferred, xferred);
2985         } else {
2986                 obj_request->xferred = img_xferred;
2987         }
2988 out:
2989         rbd_img_obj_request_read_callback(obj_request);
2990         rbd_obj_request_complete(obj_request);
2991 }
2992
2993 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2994 {
2995         struct rbd_img_request *img_request;
2996         int result;
2997
2998         rbd_assert(obj_request_img_data_test(obj_request));
2999         rbd_assert(obj_request->img_request != NULL);
3000         rbd_assert(obj_request->result == (s32) -ENOENT);
3001         rbd_assert(obj_request_type_valid(obj_request->type));
3002
3003         /* rbd_read_finish(obj_request, obj_request->length); */
3004         img_request = rbd_parent_request_create(obj_request,
3005                                                 obj_request->img_offset,
3006                                                 obj_request->length);
3007         result = -ENOMEM;
3008         if (!img_request)
3009                 goto out_err;
3010
3011         if (obj_request->type == OBJ_REQUEST_BIO)
3012                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3013                                                 obj_request->bio_list);
3014         else
3015                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3016                                                 obj_request->pages);
3017         if (result)
3018                 goto out_err;
3019
3020         img_request->callback = rbd_img_parent_read_callback;
3021         result = rbd_img_request_submit(img_request);
3022         if (result)
3023                 goto out_err;
3024
3025         return;
3026 out_err:
3027         if (img_request)
3028                 rbd_img_request_put(img_request);
3029         obj_request->result = result;
3030         obj_request->xferred = 0;
3031         obj_request_done_set(obj_request);
3032 }
3033
3034 static const struct rbd_client_id rbd_empty_cid;
3035
3036 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3037                           const struct rbd_client_id *rhs)
3038 {
3039         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3040 }
3041
3042 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3043 {
3044         struct rbd_client_id cid;
3045
3046         mutex_lock(&rbd_dev->watch_mutex);
3047         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3048         cid.handle = rbd_dev->watch_cookie;
3049         mutex_unlock(&rbd_dev->watch_mutex);
3050         return cid;
3051 }
3052
3053 /*
3054  * lock_rwsem must be held for write
3055  */
3056 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3057                               const struct rbd_client_id *cid)
3058 {
3059         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3060              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3061              cid->gid, cid->handle);
3062         rbd_dev->owner_cid = *cid; /* struct */
3063 }
3064
3065 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3066 {
3067         mutex_lock(&rbd_dev->watch_mutex);
3068         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3069         mutex_unlock(&rbd_dev->watch_mutex);
3070 }
3071
3072 /*
3073  * lock_rwsem must be held for write
3074  */
3075 static int rbd_lock(struct rbd_device *rbd_dev)
3076 {
3077         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3078         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3079         char cookie[32];
3080         int ret;
3081
3082         WARN_ON(__rbd_is_lock_owner(rbd_dev));
3083
3084         format_lock_cookie(rbd_dev, cookie);
3085         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3086                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3087                             RBD_LOCK_TAG, "", 0);
3088         if (ret)
3089                 return ret;
3090
3091         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3092         rbd_set_owner_cid(rbd_dev, &cid);
3093         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3094         return 0;
3095 }
3096
3097 /*
3098  * lock_rwsem must be held for write
3099  */
3100 static int rbd_unlock(struct rbd_device *rbd_dev)
3101 {
3102         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3103         char cookie[32];
3104         int ret;
3105
3106         WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3107
3108         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3109
3110         format_lock_cookie(rbd_dev, cookie);
3111         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3112                               RBD_LOCK_NAME, cookie);
3113         if (ret && ret != -ENOENT) {
3114                 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3115                 return ret;
3116         }
3117
3118         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3119         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3120         return 0;
3121 }
3122
3123 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3124                                 enum rbd_notify_op notify_op,
3125                                 struct page ***preply_pages,
3126                                 size_t *preply_len)
3127 {
3128         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3129         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3130         int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3131         char buf[buf_size];
3132         void *p = buf;
3133
3134         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3135
3136         /* encode *LockPayload NotifyMessage (op + ClientId) */
3137         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3138         ceph_encode_32(&p, notify_op);
3139         ceph_encode_64(&p, cid.gid);
3140         ceph_encode_64(&p, cid.handle);
3141
3142         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3143                                 &rbd_dev->header_oloc, buf, buf_size,
3144                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3145 }
3146
3147 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3148                                enum rbd_notify_op notify_op)
3149 {
3150         struct page **reply_pages;
3151         size_t reply_len;
3152
3153         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3154         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3155 }
3156
3157 static void rbd_notify_acquired_lock(struct work_struct *work)
3158 {
3159         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3160                                                   acquired_lock_work);
3161
3162         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3163 }
3164
3165 static void rbd_notify_released_lock(struct work_struct *work)
3166 {
3167         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3168                                                   released_lock_work);
3169
3170         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3171 }
3172
3173 static int rbd_request_lock(struct rbd_device *rbd_dev)
3174 {
3175         struct page **reply_pages;
3176         size_t reply_len;
3177         bool lock_owner_responded = false;
3178         int ret;
3179
3180         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3181
3182         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3183                                    &reply_pages, &reply_len);
3184         if (ret && ret != -ETIMEDOUT) {
3185                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3186                 goto out;
3187         }
3188
3189         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3190                 void *p = page_address(reply_pages[0]);
3191                 void *const end = p + reply_len;
3192                 u32 n;
3193
3194                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3195                 while (n--) {
3196                         u8 struct_v;
3197                         u32 len;
3198
3199                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3200                         p += 8 + 8; /* skip gid and cookie */
3201
3202                         ceph_decode_32_safe(&p, end, len, e_inval);
3203                         if (!len)
3204                                 continue;
3205
3206                         if (lock_owner_responded) {
3207                                 rbd_warn(rbd_dev,
3208                                          "duplicate lock owners detected");
3209                                 ret = -EIO;
3210                                 goto out;
3211                         }
3212
3213                         lock_owner_responded = true;
3214                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3215                                                   &struct_v, &len);
3216                         if (ret) {
3217                                 rbd_warn(rbd_dev,
3218                                          "failed to decode ResponseMessage: %d",
3219                                          ret);
3220                                 goto e_inval;
3221                         }
3222
3223                         ret = ceph_decode_32(&p);
3224                 }
3225         }
3226
3227         if (!lock_owner_responded) {
3228                 rbd_warn(rbd_dev, "no lock owners detected");
3229                 ret = -ETIMEDOUT;
3230         }
3231
3232 out:
3233         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3234         return ret;
3235
3236 e_inval:
3237         ret = -EINVAL;
3238         goto out;
3239 }
3240
3241 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3242 {
3243         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3244
3245         cancel_delayed_work(&rbd_dev->lock_dwork);
3246         if (wake_all)
3247                 wake_up_all(&rbd_dev->lock_waitq);
3248         else
3249                 wake_up(&rbd_dev->lock_waitq);
3250 }
3251
3252 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3253                                struct ceph_locker **lockers, u32 *num_lockers)
3254 {
3255         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3256         u8 lock_type;
3257         char *lock_tag;
3258         int ret;
3259
3260         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3261
3262         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3263                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3264                                  &lock_type, &lock_tag, lockers, num_lockers);
3265         if (ret)
3266                 return ret;
3267
3268         if (*num_lockers == 0) {
3269                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3270                 goto out;
3271         }
3272
3273         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3274                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3275                          lock_tag);
3276                 ret = -EBUSY;
3277                 goto out;
3278         }
3279
3280         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3281                 rbd_warn(rbd_dev, "shared lock type detected");
3282                 ret = -EBUSY;
3283                 goto out;
3284         }
3285
3286         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3287                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3288                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3289                          (*lockers)[0].id.cookie);
3290                 ret = -EBUSY;
3291                 goto out;
3292         }
3293
3294 out:
3295         kfree(lock_tag);
3296         return ret;
3297 }
3298
3299 static int find_watcher(struct rbd_device *rbd_dev,
3300                         const struct ceph_locker *locker)
3301 {
3302         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3303         struct ceph_watch_item *watchers;
3304         u32 num_watchers;
3305         u64 cookie;
3306         int i;
3307         int ret;
3308
3309         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3310                                       &rbd_dev->header_oloc, &watchers,
3311                                       &num_watchers);
3312         if (ret)
3313                 return ret;
3314
3315         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3316         for (i = 0; i < num_watchers; i++) {
3317                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3318                             sizeof(locker->info.addr)) &&
3319                     watchers[i].cookie == cookie) {
3320                         struct rbd_client_id cid = {
3321                                 .gid = le64_to_cpu(watchers[i].name.num),
3322                                 .handle = cookie,
3323                         };
3324
3325                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3326                              rbd_dev, cid.gid, cid.handle);
3327                         rbd_set_owner_cid(rbd_dev, &cid);
3328                         ret = 1;
3329                         goto out;
3330                 }
3331         }
3332
3333         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3334         ret = 0;
3335 out:
3336         kfree(watchers);
3337         return ret;
3338 }
3339
3340 /*
3341  * lock_rwsem must be held for write
3342  */
3343 static int rbd_try_lock(struct rbd_device *rbd_dev)
3344 {
3345         struct ceph_client *client = rbd_dev->rbd_client->client;
3346         struct ceph_locker *lockers;
3347         u32 num_lockers;
3348         int ret;
3349
3350         for (;;) {
3351                 ret = rbd_lock(rbd_dev);
3352                 if (ret != -EBUSY)
3353                         return ret;
3354
3355                 /* determine if the current lock holder is still alive */
3356                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3357                 if (ret)
3358                         return ret;
3359
3360                 if (num_lockers == 0)
3361                         goto again;
3362
3363                 ret = find_watcher(rbd_dev, lockers);
3364                 if (ret) {
3365                         if (ret > 0)
3366                                 ret = 0; /* have to request lock */
3367                         goto out;
3368                 }
3369
3370                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3371                          ENTITY_NAME(lockers[0].id.name));
3372
3373                 ret = ceph_monc_blacklist_add(&client->monc,
3374                                               &lockers[0].info.addr);
3375                 if (ret) {
3376                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3377                                  ENTITY_NAME(lockers[0].id.name), ret);
3378                         goto out;
3379                 }
3380
3381                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3382                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3383                                           lockers[0].id.cookie,
3384                                           &lockers[0].id.name);
3385                 if (ret && ret != -ENOENT)
3386                         goto out;
3387
3388 again:
3389                 ceph_free_lockers(lockers, num_lockers);
3390         }
3391
3392 out:
3393         ceph_free_lockers(lockers, num_lockers);
3394         return ret;
3395 }
3396
3397 /*
3398  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3399  */
3400 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3401                                                 int *pret)
3402 {
3403         enum rbd_lock_state lock_state;
3404
3405         down_read(&rbd_dev->lock_rwsem);
3406         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3407              rbd_dev->lock_state);
3408         if (__rbd_is_lock_owner(rbd_dev)) {
3409                 lock_state = rbd_dev->lock_state;
3410                 up_read(&rbd_dev->lock_rwsem);
3411                 return lock_state;
3412         }
3413
3414         up_read(&rbd_dev->lock_rwsem);
3415         down_write(&rbd_dev->lock_rwsem);
3416         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3417              rbd_dev->lock_state);
3418         if (!__rbd_is_lock_owner(rbd_dev)) {
3419                 *pret = rbd_try_lock(rbd_dev);
3420                 if (*pret)
3421                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3422         }
3423
3424         lock_state = rbd_dev->lock_state;
3425         up_write(&rbd_dev->lock_rwsem);
3426         return lock_state;
3427 }
3428
3429 static void rbd_acquire_lock(struct work_struct *work)
3430 {
3431         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3432                                             struct rbd_device, lock_dwork);
3433         enum rbd_lock_state lock_state;
3434         int ret;
3435
3436         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3437 again:
3438         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3439         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3440                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3441                         wake_requests(rbd_dev, true);
3442                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3443                      rbd_dev, lock_state, ret);
3444                 return;
3445         }
3446
3447         ret = rbd_request_lock(rbd_dev);
3448         if (ret == -ETIMEDOUT) {
3449                 goto again; /* treat this as a dead client */
3450         } else if (ret < 0) {
3451                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3452                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3453                                  RBD_RETRY_DELAY);
3454         } else {
3455                 /*
3456                  * lock owner acked, but resend if we don't see them
3457                  * release the lock
3458                  */
3459                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3460                      rbd_dev);
3461                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3462                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3463         }
3464 }
3465
3466 /*
3467  * lock_rwsem must be held for write
3468  */
3469 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3470 {
3471         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3472              rbd_dev->lock_state);
3473         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3474                 return false;
3475
3476         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3477         downgrade_write(&rbd_dev->lock_rwsem);
3478         /*
3479          * Ensure that all in-flight IO is flushed.
3480          *
3481          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3482          * may be shared with other devices.
3483          */
3484         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3485         up_read(&rbd_dev->lock_rwsem);
3486
3487         down_write(&rbd_dev->lock_rwsem);
3488         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3489              rbd_dev->lock_state);
3490         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3491                 return false;
3492
3493         if (!rbd_unlock(rbd_dev))
3494                 /*
3495                  * Give others a chance to grab the lock - we would re-acquire
3496                  * almost immediately if we got new IO during ceph_osdc_sync()
3497                  * otherwise.  We need to ack our own notifications, so this
3498                  * lock_dwork will be requeued from rbd_wait_state_locked()
3499                  * after wake_requests() in rbd_handle_released_lock().
3500                  */
3501                 cancel_delayed_work(&rbd_dev->lock_dwork);
3502
3503         return true;
3504 }
3505
3506 static void rbd_release_lock_work(struct work_struct *work)
3507 {
3508         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3509                                                   unlock_work);
3510
3511         down_write(&rbd_dev->lock_rwsem);
3512         rbd_release_lock(rbd_dev);
3513         up_write(&rbd_dev->lock_rwsem);
3514 }
3515
3516 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3517                                      void **p)
3518 {
3519         struct rbd_client_id cid = { 0 };
3520
3521         if (struct_v >= 2) {
3522                 cid.gid = ceph_decode_64(p);
3523                 cid.handle = ceph_decode_64(p);
3524         }
3525
3526         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3527              cid.handle);
3528         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3529                 down_write(&rbd_dev->lock_rwsem);
3530                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3531                         /*
3532                          * we already know that the remote client is
3533                          * the owner
3534                          */
3535                         up_write(&rbd_dev->lock_rwsem);
3536                         return;
3537                 }
3538
3539                 rbd_set_owner_cid(rbd_dev, &cid);
3540                 downgrade_write(&rbd_dev->lock_rwsem);
3541         } else {
3542                 down_read(&rbd_dev->lock_rwsem);
3543         }
3544
3545         if (!__rbd_is_lock_owner(rbd_dev))
3546                 wake_requests(rbd_dev, false);
3547         up_read(&rbd_dev->lock_rwsem);
3548 }
3549
3550 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3551                                      void **p)
3552 {
3553         struct rbd_client_id cid = { 0 };
3554
3555         if (struct_v >= 2) {
3556                 cid.gid = ceph_decode_64(p);
3557                 cid.handle = ceph_decode_64(p);
3558         }
3559
3560         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3561              cid.handle);
3562         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3563                 down_write(&rbd_dev->lock_rwsem);
3564                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3565                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3566                              __func__, rbd_dev, cid.gid, cid.handle,
3567                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3568                         up_write(&rbd_dev->lock_rwsem);
3569                         return;
3570                 }
3571
3572                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3573                 downgrade_write(&rbd_dev->lock_rwsem);
3574         } else {
3575                 down_read(&rbd_dev->lock_rwsem);
3576         }
3577
3578         if (!__rbd_is_lock_owner(rbd_dev))
3579                 wake_requests(rbd_dev, false);
3580         up_read(&rbd_dev->lock_rwsem);
3581 }
3582
3583 static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3584                                     void **p)
3585 {
3586         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3587         struct rbd_client_id cid = { 0 };
3588         bool need_to_send;
3589
3590         if (struct_v >= 2) {
3591                 cid.gid = ceph_decode_64(p);
3592                 cid.handle = ceph_decode_64(p);
3593         }
3594
3595         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3596              cid.handle);
3597         if (rbd_cid_equal(&cid, &my_cid))
3598                 return false;
3599
3600         down_read(&rbd_dev->lock_rwsem);
3601         need_to_send = __rbd_is_lock_owner(rbd_dev);
3602         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3603                 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3604                         dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3605                              rbd_dev);
3606                         queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3607                 }
3608         }
3609         up_read(&rbd_dev->lock_rwsem);
3610         return need_to_send;
3611 }
3612
3613 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3614                                      u64 notify_id, u64 cookie, s32 *result)
3615 {
3616         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3617         int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3618         char buf[buf_size];
3619         int ret;
3620
3621         if (result) {
3622                 void *p = buf;
3623
3624                 /* encode ResponseMessage */
3625                 ceph_start_encoding(&p, 1, 1,
3626                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3627                 ceph_encode_32(&p, *result);
3628         } else {
3629                 buf_size = 0;
3630         }
3631
3632         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3633                                    &rbd_dev->header_oloc, notify_id, cookie,
3634                                    buf, buf_size);
3635         if (ret)
3636                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3637 }
3638
3639 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3640                                    u64 cookie)
3641 {
3642         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3643         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3644 }
3645
3646 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3647                                           u64 notify_id, u64 cookie, s32 result)
3648 {
3649         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3650         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3651 }
3652
3653 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3654                          u64 notifier_id, void *data, size_t data_len)
3655 {
3656         struct rbd_device *rbd_dev = arg;
3657         void *p = data;
3658         void *const end = p + data_len;
3659         u8 struct_v = 0;
3660         u32 len;
3661         u32 notify_op;
3662         int ret;
3663
3664         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3665              __func__, rbd_dev, cookie, notify_id, data_len);
3666         if (data_len) {
3667                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3668                                           &struct_v, &len);
3669                 if (ret) {
3670                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3671                                  ret);
3672                         return;
3673                 }
3674
3675                 notify_op = ceph_decode_32(&p);
3676         } else {
3677                 /* legacy notification for header updates */
3678                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3679                 len = 0;
3680         }
3681
3682         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3683         switch (notify_op) {
3684         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3685                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3686                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3687                 break;
3688         case RBD_NOTIFY_OP_RELEASED_LOCK:
3689                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3690                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3691                 break;
3692         case RBD_NOTIFY_OP_REQUEST_LOCK:
3693                 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3694                         /*
3695                          * send ResponseMessage(0) back so the client
3696                          * can detect a missing owner
3697                          */
3698                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3699                                                       cookie, 0);
3700                 else
3701                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3702                 break;
3703         case RBD_NOTIFY_OP_HEADER_UPDATE:
3704                 ret = rbd_dev_refresh(rbd_dev);
3705                 if (ret)
3706                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3707
3708                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3709                 break;
3710         default:
3711                 if (rbd_is_lock_owner(rbd_dev))
3712                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3713                                                       cookie, -EOPNOTSUPP);
3714                 else
3715                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3716                 break;
3717         }
3718 }
3719
3720 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3721
3722 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3723 {
3724         struct rbd_device *rbd_dev = arg;
3725
3726         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3727
3728         down_write(&rbd_dev->lock_rwsem);
3729         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3730         up_write(&rbd_dev->lock_rwsem);
3731
3732         mutex_lock(&rbd_dev->watch_mutex);
3733         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3734                 __rbd_unregister_watch(rbd_dev);
3735                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3736
3737                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3738         }
3739         mutex_unlock(&rbd_dev->watch_mutex);
3740 }
3741
3742 /*
3743  * watch_mutex must be locked
3744  */
3745 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3746 {
3747         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3748         struct ceph_osd_linger_request *handle;
3749
3750         rbd_assert(!rbd_dev->watch_handle);
3751         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3752
3753         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3754                                  &rbd_dev->header_oloc, rbd_watch_cb,
3755                                  rbd_watch_errcb, rbd_dev);
3756         if (IS_ERR(handle))
3757                 return PTR_ERR(handle);
3758
3759         rbd_dev->watch_handle = handle;
3760         return 0;
3761 }
3762
3763 /*
3764  * watch_mutex must be locked
3765  */
3766 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3767 {
3768         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3769         int ret;
3770
3771         rbd_assert(rbd_dev->watch_handle);
3772         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3773
3774         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3775         if (ret)
3776                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3777
3778         rbd_dev->watch_handle = NULL;
3779 }
3780
3781 static int rbd_register_watch(struct rbd_device *rbd_dev)
3782 {
3783         int ret;
3784
3785         mutex_lock(&rbd_dev->watch_mutex);
3786         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3787         ret = __rbd_register_watch(rbd_dev);
3788         if (ret)
3789                 goto out;
3790
3791         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3792         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3793
3794 out:
3795         mutex_unlock(&rbd_dev->watch_mutex);
3796         return ret;
3797 }
3798
3799 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3800 {
3801         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3802
3803         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3804         cancel_work_sync(&rbd_dev->acquired_lock_work);
3805         cancel_work_sync(&rbd_dev->released_lock_work);
3806         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3807         cancel_work_sync(&rbd_dev->unlock_work);
3808 }
3809
3810 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3811 {
3812         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3813         cancel_tasks_sync(rbd_dev);
3814
3815         mutex_lock(&rbd_dev->watch_mutex);
3816         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3817                 __rbd_unregister_watch(rbd_dev);
3818         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3819         mutex_unlock(&rbd_dev->watch_mutex);
3820
3821         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3822 }
3823
3824 static void rbd_reregister_watch(struct work_struct *work)
3825 {
3826         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3827                                             struct rbd_device, watch_dwork);
3828         bool was_lock_owner = false;
3829         bool need_to_wake = false;
3830         int ret;
3831
3832         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3833
3834         down_write(&rbd_dev->lock_rwsem);
3835         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3836                 was_lock_owner = rbd_release_lock(rbd_dev);
3837
3838         mutex_lock(&rbd_dev->watch_mutex);
3839         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3840                 mutex_unlock(&rbd_dev->watch_mutex);
3841                 goto out;
3842         }
3843
3844         ret = __rbd_register_watch(rbd_dev);
3845         if (ret) {
3846                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3847                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3848                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3849                         need_to_wake = true;
3850                 } else {
3851                         queue_delayed_work(rbd_dev->task_wq,
3852                                            &rbd_dev->watch_dwork,
3853                                            RBD_RETRY_DELAY);
3854                 }
3855                 mutex_unlock(&rbd_dev->watch_mutex);
3856                 goto out;
3857         }
3858
3859         need_to_wake = true;
3860         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3861         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3862         mutex_unlock(&rbd_dev->watch_mutex);
3863
3864         ret = rbd_dev_refresh(rbd_dev);
3865         if (ret)
3866                 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3867
3868         if (was_lock_owner) {
3869                 ret = rbd_try_lock(rbd_dev);
3870                 if (ret)
3871                         rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3872                                  ret);
3873         }
3874
3875 out:
3876         up_write(&rbd_dev->lock_rwsem);
3877         if (need_to_wake)
3878                 wake_requests(rbd_dev, true);
3879 }
3880
3881 /*
3882  * Synchronous osd object method call.  Returns the number of bytes
3883  * returned in the outbound buffer, or a negative error code.
3884  */
3885 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3886                              struct ceph_object_id *oid,
3887                              struct ceph_object_locator *oloc,
3888                              const char *method_name,
3889                              const void *outbound,
3890                              size_t outbound_size,
3891                              void *inbound,
3892                              size_t inbound_size)
3893 {
3894         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3895         struct page *req_page = NULL;
3896         struct page *reply_page;
3897         int ret;
3898
3899         /*
3900          * Method calls are ultimately read operations.  The result
3901          * should placed into the inbound buffer provided.  They
3902          * also supply outbound data--parameters for the object
3903          * method.  Currently if this is present it will be a
3904          * snapshot id.
3905          */
3906         if (outbound) {
3907                 if (outbound_size > PAGE_SIZE)
3908                         return -E2BIG;
3909
3910                 req_page = alloc_page(GFP_KERNEL);
3911                 if (!req_page)
3912                         return -ENOMEM;
3913
3914                 memcpy(page_address(req_page), outbound, outbound_size);
3915         }
3916
3917         reply_page = alloc_page(GFP_KERNEL);
3918         if (!reply_page) {
3919                 if (req_page)
3920                         __free_page(req_page);
3921                 return -ENOMEM;
3922         }
3923
3924         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3925                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
3926                              reply_page, &inbound_size);
3927         if (!ret) {
3928                 memcpy(inbound, page_address(reply_page), inbound_size);
3929                 ret = inbound_size;
3930         }
3931
3932         if (req_page)
3933                 __free_page(req_page);
3934         __free_page(reply_page);
3935         return ret;
3936 }
3937
3938 /*
3939  * lock_rwsem must be held for read
3940  */
3941 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3942 {
3943         DEFINE_WAIT(wait);
3944
3945         do {
3946                 /*
3947                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
3948                  * and cancel_delayed_work() in wake_requests().
3949                  */
3950                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3951                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3952                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3953                                           TASK_UNINTERRUPTIBLE);
3954                 up_read(&rbd_dev->lock_rwsem);
3955                 schedule();
3956                 down_read(&rbd_dev->lock_rwsem);
3957         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3958                  !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
3959
3960         finish_wait(&rbd_dev->lock_waitq, &wait);
3961 }
3962
3963 static void rbd_queue_workfn(struct work_struct *work)
3964 {
3965         struct request *rq = blk_mq_rq_from_pdu(work);
3966         struct rbd_device *rbd_dev = rq->q->queuedata;
3967         struct rbd_img_request *img_request;
3968         struct ceph_snap_context *snapc = NULL;
3969         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3970         u64 length = blk_rq_bytes(rq);
3971         enum obj_operation_type op_type;
3972         u64 mapping_size;
3973         bool must_be_locked;
3974         int result;
3975
3976         switch (req_op(rq)) {
3977         case REQ_OP_DISCARD:
3978                 op_type = OBJ_OP_DISCARD;
3979                 break;
3980         case REQ_OP_WRITE:
3981                 op_type = OBJ_OP_WRITE;
3982                 break;
3983         case REQ_OP_READ:
3984                 op_type = OBJ_OP_READ;
3985                 break;
3986         default:
3987                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3988                 result = -EIO;
3989                 goto err;
3990         }
3991
3992         /* Ignore/skip any zero-length requests */
3993
3994         if (!length) {
3995                 dout("%s: zero-length request\n", __func__);
3996                 result = 0;
3997                 goto err_rq;
3998         }
3999
4000         /* Only reads are allowed to a read-only device */
4001
4002         if (op_type != OBJ_OP_READ) {
4003                 if (rbd_dev->mapping.read_only) {
4004                         result = -EROFS;
4005                         goto err_rq;
4006                 }
4007                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4008         }
4009
4010         /*
4011          * Quit early if the mapped snapshot no longer exists.  It's
4012          * still possible the snapshot will have disappeared by the
4013          * time our request arrives at the osd, but there's no sense in
4014          * sending it if we already know.
4015          */
4016         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4017                 dout("request for non-existent snapshot");
4018                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4019                 result = -ENXIO;
4020                 goto err_rq;
4021         }
4022
4023         if (offset && length > U64_MAX - offset + 1) {
4024                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4025                          length);
4026                 result = -EINVAL;
4027                 goto err_rq;    /* Shouldn't happen */
4028         }
4029
4030         blk_mq_start_request(rq);
4031
4032         down_read(&rbd_dev->header_rwsem);
4033         mapping_size = rbd_dev->mapping.size;
4034         if (op_type != OBJ_OP_READ) {
4035                 snapc = rbd_dev->header.snapc;
4036                 ceph_get_snap_context(snapc);
4037                 must_be_locked = rbd_is_lock_supported(rbd_dev);
4038         } else {
4039                 must_be_locked = rbd_dev->opts->lock_on_read &&
4040                                         rbd_is_lock_supported(rbd_dev);
4041         }
4042         up_read(&rbd_dev->header_rwsem);
4043
4044         if (offset + length > mapping_size) {
4045                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4046                          length, mapping_size);
4047                 result = -EIO;
4048                 goto err_rq;
4049         }
4050
4051         if (must_be_locked) {
4052                 down_read(&rbd_dev->lock_rwsem);
4053                 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4054                     !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
4055                         rbd_wait_state_locked(rbd_dev);
4056
4057                 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4058                         !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4059                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4060                         result = -EBLACKLISTED;
4061                         goto err_unlock;
4062                 }
4063         }
4064
4065         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4066                                              snapc);
4067         if (!img_request) {
4068                 result = -ENOMEM;
4069                 goto err_unlock;
4070         }
4071         img_request->rq = rq;
4072         snapc = NULL; /* img_request consumes a ref */
4073
4074         if (op_type == OBJ_OP_DISCARD)
4075                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4076                                               NULL);
4077         else
4078                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4079                                               rq->bio);
4080         if (result)
4081                 goto err_img_request;
4082
4083         result = rbd_img_request_submit(img_request);
4084         if (result)
4085                 goto err_img_request;
4086
4087         if (must_be_locked)
4088                 up_read(&rbd_dev->lock_rwsem);
4089         return;
4090
4091 err_img_request:
4092         rbd_img_request_put(img_request);
4093 err_unlock:
4094         if (must_be_locked)
4095                 up_read(&rbd_dev->lock_rwsem);
4096 err_rq:
4097         if (result)
4098                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4099                          obj_op_name(op_type), length, offset, result);
4100         ceph_put_snap_context(snapc);
4101 err:
4102         blk_mq_end_request(rq, result);
4103 }
4104
4105 static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4106                 const struct blk_mq_queue_data *bd)
4107 {
4108         struct request *rq = bd->rq;
4109         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4110
4111         queue_work(rbd_wq, work);
4112         return BLK_MQ_RQ_QUEUE_OK;
4113 }
4114
4115 static void rbd_free_disk(struct rbd_device *rbd_dev)
4116 {
4117         struct gendisk *disk = rbd_dev->disk;
4118
4119         if (!disk)
4120                 return;
4121
4122         rbd_dev->disk = NULL;
4123         if (disk->flags & GENHD_FL_UP) {
4124                 del_gendisk(disk);
4125                 if (disk->queue)
4126                         blk_cleanup_queue(disk->queue);
4127                 blk_mq_free_tag_set(&rbd_dev->tag_set);
4128         }
4129         put_disk(disk);
4130 }
4131
4132 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4133                              struct ceph_object_id *oid,
4134                              struct ceph_object_locator *oloc,
4135                              void *buf, int buf_len)
4136
4137 {
4138         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4139         struct ceph_osd_request *req;
4140         struct page **pages;
4141         int num_pages = calc_pages_for(0, buf_len);
4142         int ret;
4143
4144         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4145         if (!req)
4146                 return -ENOMEM;
4147
4148         ceph_oid_copy(&req->r_base_oid, oid);
4149         ceph_oloc_copy(&req->r_base_oloc, oloc);
4150         req->r_flags = CEPH_OSD_FLAG_READ;
4151
4152         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4153         if (ret)
4154                 goto out_req;
4155
4156         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4157         if (IS_ERR(pages)) {
4158                 ret = PTR_ERR(pages);
4159                 goto out_req;
4160         }
4161
4162         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4163         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4164                                          true);
4165
4166         ceph_osdc_start_request(osdc, req, false);
4167         ret = ceph_osdc_wait_request(osdc, req);
4168         if (ret >= 0)
4169                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4170
4171 out_req:
4172         ceph_osdc_put_request(req);
4173         return ret;
4174 }
4175
4176 /*
4177  * Read the complete header for the given rbd device.  On successful
4178  * return, the rbd_dev->header field will contain up-to-date
4179  * information about the image.
4180  */
4181 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4182 {
4183         struct rbd_image_header_ondisk *ondisk = NULL;
4184         u32 snap_count = 0;
4185         u64 names_size = 0;
4186         u32 want_count;
4187         int ret;
4188
4189         /*
4190          * The complete header will include an array of its 64-bit
4191          * snapshot ids, followed by the names of those snapshots as
4192          * a contiguous block of NUL-terminated strings.  Note that
4193          * the number of snapshots could change by the time we read
4194          * it in, in which case we re-read it.
4195          */
4196         do {
4197                 size_t size;
4198
4199                 kfree(ondisk);
4200
4201                 size = sizeof (*ondisk);
4202                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4203                 size += names_size;
4204                 ondisk = kmalloc(size, GFP_KERNEL);
4205                 if (!ondisk)
4206                         return -ENOMEM;
4207
4208                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4209                                         &rbd_dev->header_oloc, ondisk, size);
4210                 if (ret < 0)
4211                         goto out;
4212                 if ((size_t)ret < size) {
4213                         ret = -ENXIO;
4214                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4215                                 size, ret);
4216                         goto out;
4217                 }
4218                 if (!rbd_dev_ondisk_valid(ondisk)) {
4219                         ret = -ENXIO;
4220                         rbd_warn(rbd_dev, "invalid header");
4221                         goto out;
4222                 }
4223
4224                 names_size = le64_to_cpu(ondisk->snap_names_len);
4225                 want_count = snap_count;
4226                 snap_count = le32_to_cpu(ondisk->snap_count);
4227         } while (snap_count != want_count);
4228
4229         ret = rbd_header_from_disk(rbd_dev, ondisk);
4230 out:
4231         kfree(ondisk);
4232
4233         return ret;
4234 }
4235
4236 /*
4237  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4238  * has disappeared from the (just updated) snapshot context.
4239  */
4240 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4241 {
4242         u64 snap_id;
4243
4244         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4245                 return;
4246
4247         snap_id = rbd_dev->spec->snap_id;
4248         if (snap_id == CEPH_NOSNAP)
4249                 return;
4250
4251         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4252                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4253 }
4254
4255 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4256 {
4257         sector_t size;
4258
4259         /*
4260          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4261          * try to update its size.  If REMOVING is set, updating size
4262          * is just useless work since the device can't be opened.
4263          */
4264         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4265             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4266                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4267                 dout("setting size to %llu sectors", (unsigned long long)size);
4268                 set_capacity(rbd_dev->disk, size);
4269                 revalidate_disk(rbd_dev->disk);
4270         }
4271 }
4272
4273 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4274 {
4275         u64 mapping_size;
4276         int ret;
4277
4278         down_write(&rbd_dev->header_rwsem);
4279         mapping_size = rbd_dev->mapping.size;
4280
4281         ret = rbd_dev_header_info(rbd_dev);
4282         if (ret)
4283                 goto out;
4284
4285         /*
4286          * If there is a parent, see if it has disappeared due to the
4287          * mapped image getting flattened.
4288          */
4289         if (rbd_dev->parent) {
4290                 ret = rbd_dev_v2_parent_info(rbd_dev);
4291                 if (ret)
4292                         goto out;
4293         }
4294
4295         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4296                 rbd_dev->mapping.size = rbd_dev->header.image_size;
4297         } else {
4298                 /* validate mapped snapshot's EXISTS flag */
4299                 rbd_exists_validate(rbd_dev);
4300         }
4301
4302 out:
4303         up_write(&rbd_dev->header_rwsem);
4304         if (!ret && mapping_size != rbd_dev->mapping.size)
4305                 rbd_dev_update_size(rbd_dev);
4306
4307         return ret;
4308 }
4309
4310 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4311                 unsigned int hctx_idx, unsigned int numa_node)
4312 {
4313         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4314
4315         INIT_WORK(work, rbd_queue_workfn);
4316         return 0;
4317 }
4318
4319 static const struct blk_mq_ops rbd_mq_ops = {
4320         .queue_rq       = rbd_queue_rq,
4321         .init_request   = rbd_init_request,
4322 };
4323
4324 static int rbd_init_disk(struct rbd_device *rbd_dev)
4325 {
4326         struct gendisk *disk;
4327         struct request_queue *q;
4328         u64 segment_size;
4329         int err;
4330
4331         /* create gendisk info */
4332         disk = alloc_disk(single_major ?
4333                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4334                           RBD_MINORS_PER_MAJOR);
4335         if (!disk)
4336                 return -ENOMEM;
4337
4338         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4339                  rbd_dev->dev_id);
4340         disk->major = rbd_dev->major;
4341         disk->first_minor = rbd_dev->minor;
4342         if (single_major)
4343                 disk->flags |= GENHD_FL_EXT_DEVT;
4344         disk->fops = &rbd_bd_ops;
4345         disk->private_data = rbd_dev;
4346
4347         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4348         rbd_dev->tag_set.ops = &rbd_mq_ops;
4349         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4350         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4351         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4352         rbd_dev->tag_set.nr_hw_queues = 1;
4353         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4354
4355         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4356         if (err)
4357                 goto out_disk;
4358
4359         q = blk_mq_init_queue(&rbd_dev->tag_set);
4360         if (IS_ERR(q)) {
4361                 err = PTR_ERR(q);
4362                 goto out_tag_set;
4363         }
4364
4365         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4366         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4367
4368         /* set io sizes to object size */
4369         segment_size = rbd_obj_bytes(&rbd_dev->header);
4370         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4371         q->limits.max_sectors = queue_max_hw_sectors(q);
4372         blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4373         blk_queue_max_segment_size(q, segment_size);
4374         blk_queue_io_min(q, segment_size);
4375         blk_queue_io_opt(q, segment_size);
4376
4377         /* enable the discard support */
4378         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4379         q->limits.discard_granularity = segment_size;
4380         q->limits.discard_alignment = segment_size;
4381         blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4382
4383         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4384                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4385
4386         disk->queue = q;
4387
4388         q->queuedata = rbd_dev;
4389
4390         rbd_dev->disk = disk;
4391
4392         return 0;
4393 out_tag_set:
4394         blk_mq_free_tag_set(&rbd_dev->tag_set);
4395 out_disk:
4396         put_disk(disk);
4397         return err;
4398 }
4399
4400 /*
4401   sysfs
4402 */
4403
4404 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4405 {
4406         return container_of(dev, struct rbd_device, dev);
4407 }
4408
4409 static ssize_t rbd_size_show(struct device *dev,
4410                              struct device_attribute *attr, char *buf)
4411 {
4412         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4413
4414         return sprintf(buf, "%llu\n",
4415                 (unsigned long long)rbd_dev->mapping.size);
4416 }
4417
4418 /*
4419  * Note this shows the features for whatever's mapped, which is not
4420  * necessarily the base image.
4421  */
4422 static ssize_t rbd_features_show(struct device *dev,
4423                              struct device_attribute *attr, char *buf)
4424 {
4425         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4426
4427         return sprintf(buf, "0x%016llx\n",
4428                         (unsigned long long)rbd_dev->mapping.features);
4429 }
4430
4431 static ssize_t rbd_major_show(struct device *dev,
4432                               struct device_attribute *attr, char *buf)
4433 {
4434         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4435
4436         if (rbd_dev->major)
4437                 return sprintf(buf, "%d\n", rbd_dev->major);
4438
4439         return sprintf(buf, "(none)\n");
4440 }
4441
4442 static ssize_t rbd_minor_show(struct device *dev,
4443                               struct device_attribute *attr, char *buf)
4444 {
4445         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4446
4447         return sprintf(buf, "%d\n", rbd_dev->minor);
4448 }
4449
4450 static ssize_t rbd_client_addr_show(struct device *dev,
4451                                     struct device_attribute *attr, char *buf)
4452 {
4453         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4454         struct ceph_entity_addr *client_addr =
4455             ceph_client_addr(rbd_dev->rbd_client->client);
4456
4457         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4458                        le32_to_cpu(client_addr->nonce));
4459 }
4460
4461 static ssize_t rbd_client_id_show(struct device *dev,
4462                                   struct device_attribute *attr, char *buf)
4463 {
4464         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4465
4466         return sprintf(buf, "client%lld\n",
4467                        ceph_client_gid(rbd_dev->rbd_client->client));
4468 }
4469
4470 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4471                                      struct device_attribute *attr, char *buf)
4472 {
4473         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4474
4475         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4476 }
4477
4478 static ssize_t rbd_config_info_show(struct device *dev,
4479                                     struct device_attribute *attr, char *buf)
4480 {
4481         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4482
4483         return sprintf(buf, "%s\n", rbd_dev->config_info);
4484 }
4485
4486 static ssize_t rbd_pool_show(struct device *dev,
4487                              struct device_attribute *attr, char *buf)
4488 {
4489         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4490
4491         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4492 }
4493
4494 static ssize_t rbd_pool_id_show(struct device *dev,
4495                              struct device_attribute *attr, char *buf)
4496 {
4497         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4498
4499         return sprintf(buf, "%llu\n",
4500                         (unsigned long long) rbd_dev->spec->pool_id);
4501 }
4502
4503 static ssize_t rbd_name_show(struct device *dev,
4504                              struct device_attribute *attr, char *buf)
4505 {
4506         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4507
4508         if (rbd_dev->spec->image_name)
4509                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4510
4511         return sprintf(buf, "(unknown)\n");
4512 }
4513
4514 static ssize_t rbd_image_id_show(struct device *dev,
4515                              struct device_attribute *attr, char *buf)
4516 {
4517         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4518
4519         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4520 }
4521
4522 /*
4523  * Shows the name of the currently-mapped snapshot (or
4524  * RBD_SNAP_HEAD_NAME for the base image).
4525  */
4526 static ssize_t rbd_snap_show(struct device *dev,
4527                              struct device_attribute *attr,
4528                              char *buf)
4529 {
4530         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4531
4532         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4533 }
4534
4535 static ssize_t rbd_snap_id_show(struct device *dev,
4536                                 struct device_attribute *attr, char *buf)
4537 {
4538         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4539
4540         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4541 }
4542
4543 /*
4544  * For a v2 image, shows the chain of parent images, separated by empty
4545  * lines.  For v1 images or if there is no parent, shows "(no parent
4546  * image)".
4547  */
4548 static ssize_t rbd_parent_show(struct device *dev,
4549                                struct device_attribute *attr,
4550                                char *buf)
4551 {
4552         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4553         ssize_t count = 0;
4554
4555         if (!rbd_dev->parent)
4556                 return sprintf(buf, "(no parent image)\n");
4557
4558         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4559                 struct rbd_spec *spec = rbd_dev->parent_spec;
4560
4561                 count += sprintf(&buf[count], "%s"
4562                             "pool_id %llu\npool_name %s\n"
4563                             "image_id %s\nimage_name %s\n"
4564                             "snap_id %llu\nsnap_name %s\n"
4565                             "overlap %llu\n",
4566                             !count ? "" : "\n", /* first? */
4567                             spec->pool_id, spec->pool_name,
4568                             spec->image_id, spec->image_name ?: "(unknown)",
4569                             spec->snap_id, spec->snap_name,
4570                             rbd_dev->parent_overlap);
4571         }
4572
4573         return count;
4574 }
4575
4576 static ssize_t rbd_image_refresh(struct device *dev,
4577                                  struct device_attribute *attr,
4578                                  const char *buf,
4579                                  size_t size)
4580 {
4581         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4582         int ret;
4583
4584         ret = rbd_dev_refresh(rbd_dev);
4585         if (ret)
4586                 return ret;
4587
4588         return size;
4589 }
4590
4591 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4592 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4593 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4594 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4595 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4596 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4597 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4598 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4599 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4600 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4601 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4602 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4603 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4604 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4605 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4606 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4607
4608 static struct attribute *rbd_attrs[] = {
4609         &dev_attr_size.attr,
4610         &dev_attr_features.attr,
4611         &dev_attr_major.attr,
4612         &dev_attr_minor.attr,
4613         &dev_attr_client_addr.attr,
4614         &dev_attr_client_id.attr,
4615         &dev_attr_cluster_fsid.attr,
4616         &dev_attr_config_info.attr,
4617         &dev_attr_pool.attr,
4618         &dev_attr_pool_id.attr,
4619         &dev_attr_name.attr,
4620         &dev_attr_image_id.attr,
4621         &dev_attr_current_snap.attr,
4622         &dev_attr_snap_id.attr,
4623         &dev_attr_parent.attr,
4624         &dev_attr_refresh.attr,
4625         NULL
4626 };
4627
4628 static struct attribute_group rbd_attr_group = {
4629         .attrs = rbd_attrs,
4630 };
4631
4632 static const struct attribute_group *rbd_attr_groups[] = {
4633         &rbd_attr_group,
4634         NULL
4635 };
4636
4637 static void rbd_dev_release(struct device *dev);
4638
4639 static const struct device_type rbd_device_type = {
4640         .name           = "rbd",
4641         .groups         = rbd_attr_groups,
4642         .release        = rbd_dev_release,
4643 };
4644
4645 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4646 {
4647         kref_get(&spec->kref);
4648
4649         return spec;
4650 }
4651
4652 static void rbd_spec_free(struct kref *kref);
4653 static void rbd_spec_put(struct rbd_spec *spec)
4654 {
4655         if (spec)
4656                 kref_put(&spec->kref, rbd_spec_free);
4657 }
4658
4659 static struct rbd_spec *rbd_spec_alloc(void)
4660 {
4661         struct rbd_spec *spec;
4662
4663         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4664         if (!spec)
4665                 return NULL;
4666
4667         spec->pool_id = CEPH_NOPOOL;
4668         spec->snap_id = CEPH_NOSNAP;
4669         kref_init(&spec->kref);
4670
4671         return spec;
4672 }
4673
4674 static void rbd_spec_free(struct kref *kref)
4675 {
4676         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4677
4678         kfree(spec->pool_name);
4679         kfree(spec->image_id);
4680         kfree(spec->image_name);
4681         kfree(spec->snap_name);
4682         kfree(spec);
4683 }
4684
4685 static void rbd_dev_free(struct rbd_device *rbd_dev)
4686 {
4687         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4688         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4689
4690         ceph_oid_destroy(&rbd_dev->header_oid);
4691         ceph_oloc_destroy(&rbd_dev->header_oloc);
4692         kfree(rbd_dev->config_info);
4693
4694         rbd_put_client(rbd_dev->rbd_client);
4695         rbd_spec_put(rbd_dev->spec);
4696         kfree(rbd_dev->opts);
4697         kfree(rbd_dev);
4698 }
4699
4700 static void rbd_dev_release(struct device *dev)
4701 {
4702         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4703         bool need_put = !!rbd_dev->opts;
4704
4705         if (need_put) {
4706                 destroy_workqueue(rbd_dev->task_wq);
4707                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4708         }
4709
4710         rbd_dev_free(rbd_dev);
4711
4712         /*
4713          * This is racy, but way better than putting module outside of
4714          * the release callback.  The race window is pretty small, so
4715          * doing something similar to dm (dm-builtin.c) is overkill.
4716          */
4717         if (need_put)
4718                 module_put(THIS_MODULE);
4719 }
4720
4721 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4722                                            struct rbd_spec *spec)
4723 {
4724         struct rbd_device *rbd_dev;
4725
4726         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4727         if (!rbd_dev)
4728                 return NULL;
4729
4730         spin_lock_init(&rbd_dev->lock);
4731         INIT_LIST_HEAD(&rbd_dev->node);
4732         init_rwsem(&rbd_dev->header_rwsem);
4733
4734         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4735         ceph_oid_init(&rbd_dev->header_oid);
4736         rbd_dev->header_oloc.pool = spec->pool_id;
4737
4738         mutex_init(&rbd_dev->watch_mutex);
4739         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4740         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4741
4742         init_rwsem(&rbd_dev->lock_rwsem);
4743         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4744         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4745         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4746         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4747         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4748         init_waitqueue_head(&rbd_dev->lock_waitq);
4749
4750         rbd_dev->dev.bus = &rbd_bus_type;
4751         rbd_dev->dev.type = &rbd_device_type;
4752         rbd_dev->dev.parent = &rbd_root_dev;
4753         device_initialize(&rbd_dev->dev);
4754
4755         rbd_dev->rbd_client = rbdc;
4756         rbd_dev->spec = spec;
4757
4758         return rbd_dev;
4759 }
4760
4761 /*
4762  * Create a mapping rbd_dev.
4763  */
4764 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4765                                          struct rbd_spec *spec,
4766                                          struct rbd_options *opts)
4767 {
4768         struct rbd_device *rbd_dev;
4769
4770         rbd_dev = __rbd_dev_create(rbdc, spec);
4771         if (!rbd_dev)
4772                 return NULL;
4773
4774         rbd_dev->opts = opts;
4775
4776         /* get an id and fill in device name */
4777         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4778                                          minor_to_rbd_dev_id(1 << MINORBITS),
4779                                          GFP_KERNEL);
4780         if (rbd_dev->dev_id < 0)
4781                 goto fail_rbd_dev;
4782
4783         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4784         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4785                                                    rbd_dev->name);
4786         if (!rbd_dev->task_wq)
4787                 goto fail_dev_id;
4788
4789         /* we have a ref from do_rbd_add() */
4790         __module_get(THIS_MODULE);
4791
4792         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4793         return rbd_dev;
4794
4795 fail_dev_id:
4796         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4797 fail_rbd_dev:
4798         rbd_dev_free(rbd_dev);
4799         return NULL;
4800 }
4801
4802 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4803 {
4804         if (rbd_dev)
4805                 put_device(&rbd_dev->dev);
4806 }
4807
4808 /*
4809  * Get the size and object order for an image snapshot, or if
4810  * snap_id is CEPH_NOSNAP, gets this information for the base
4811  * image.
4812  */
4813 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4814                                 u8 *order, u64 *snap_size)
4815 {
4816         __le64 snapid = cpu_to_le64(snap_id);
4817         int ret;
4818         struct {
4819                 u8 order;
4820                 __le64 size;
4821         } __attribute__ ((packed)) size_buf = { 0 };
4822
4823         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4824                                   &rbd_dev->header_oloc, "get_size",
4825                                   &snapid, sizeof(snapid),
4826                                   &size_buf, sizeof(size_buf));
4827         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4828         if (ret < 0)
4829                 return ret;
4830         if (ret < sizeof (size_buf))
4831                 return -ERANGE;
4832
4833         if (order) {
4834                 *order = size_buf.order;
4835                 dout("  order %u", (unsigned int)*order);
4836         }
4837         *snap_size = le64_to_cpu(size_buf.size);
4838
4839         dout("  snap_id 0x%016llx snap_size = %llu\n",
4840                 (unsigned long long)snap_id,
4841                 (unsigned long long)*snap_size);
4842
4843         return 0;
4844 }
4845
4846 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4847 {
4848         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4849                                         &rbd_dev->header.obj_order,
4850                                         &rbd_dev->header.image_size);
4851 }
4852
4853 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4854 {
4855         void *reply_buf;
4856         int ret;
4857         void *p;
4858
4859         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4860         if (!reply_buf)
4861                 return -ENOMEM;
4862
4863         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4864                                   &rbd_dev->header_oloc, "get_object_prefix",
4865                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4866         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4867         if (ret < 0)
4868                 goto out;
4869
4870         p = reply_buf;
4871         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4872                                                 p + ret, NULL, GFP_NOIO);
4873         ret = 0;
4874
4875         if (IS_ERR(rbd_dev->header.object_prefix)) {
4876                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4877                 rbd_dev->header.object_prefix = NULL;
4878         } else {
4879                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4880         }
4881 out:
4882         kfree(reply_buf);
4883
4884         return ret;
4885 }
4886
4887 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4888                 u64 *snap_features)
4889 {
4890         __le64 snapid = cpu_to_le64(snap_id);
4891         struct {
4892                 __le64 features;
4893                 __le64 incompat;
4894         } __attribute__ ((packed)) features_buf = { 0 };
4895         u64 unsup;
4896         int ret;
4897
4898         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4899                                   &rbd_dev->header_oloc, "get_features",
4900                                   &snapid, sizeof(snapid),
4901                                   &features_buf, sizeof(features_buf));
4902         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4903         if (ret < 0)
4904                 return ret;
4905         if (ret < sizeof (features_buf))
4906                 return -ERANGE;
4907
4908         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4909         if (unsup) {
4910                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4911                          unsup);
4912                 return -ENXIO;
4913         }
4914
4915         *snap_features = le64_to_cpu(features_buf.features);
4916
4917         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4918                 (unsigned long long)snap_id,
4919                 (unsigned long long)*snap_features,
4920                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4921
4922         return 0;
4923 }
4924
4925 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4926 {
4927         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4928                                                 &rbd_dev->header.features);
4929 }
4930
4931 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4932 {
4933         struct rbd_spec *parent_spec;
4934         size_t size;
4935         void *reply_buf = NULL;
4936         __le64 snapid;
4937         void *p;
4938         void *end;
4939         u64 pool_id;
4940         char *image_id;
4941         u64 snap_id;
4942         u64 overlap;
4943         int ret;
4944
4945         parent_spec = rbd_spec_alloc();
4946         if (!parent_spec)
4947                 return -ENOMEM;
4948
4949         size = sizeof (__le64) +                                /* pool_id */
4950                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
4951                 sizeof (__le64) +                               /* snap_id */
4952                 sizeof (__le64);                                /* overlap */
4953         reply_buf = kmalloc(size, GFP_KERNEL);
4954         if (!reply_buf) {
4955                 ret = -ENOMEM;
4956                 goto out_err;
4957         }
4958
4959         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4960         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4961                                   &rbd_dev->header_oloc, "get_parent",
4962                                   &snapid, sizeof(snapid), reply_buf, size);
4963         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4964         if (ret < 0)
4965                 goto out_err;
4966
4967         p = reply_buf;
4968         end = reply_buf + ret;
4969         ret = -ERANGE;
4970         ceph_decode_64_safe(&p, end, pool_id, out_err);
4971         if (pool_id == CEPH_NOPOOL) {
4972                 /*
4973                  * Either the parent never existed, or we have
4974                  * record of it but the image got flattened so it no
4975                  * longer has a parent.  When the parent of a
4976                  * layered image disappears we immediately set the
4977                  * overlap to 0.  The effect of this is that all new
4978                  * requests will be treated as if the image had no
4979                  * parent.
4980                  */
4981                 if (rbd_dev->parent_overlap) {
4982                         rbd_dev->parent_overlap = 0;
4983                         rbd_dev_parent_put(rbd_dev);
4984                         pr_info("%s: clone image has been flattened\n",
4985                                 rbd_dev->disk->disk_name);
4986                 }
4987
4988                 goto out;       /* No parent?  No problem. */
4989         }
4990
4991         /* The ceph file layout needs to fit pool id in 32 bits */
4992
4993         ret = -EIO;
4994         if (pool_id > (u64)U32_MAX) {
4995                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4996                         (unsigned long long)pool_id, U32_MAX);
4997                 goto out_err;
4998         }
4999
5000         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5001         if (IS_ERR(image_id)) {
5002                 ret = PTR_ERR(image_id);
5003                 goto out_err;
5004         }
5005         ceph_decode_64_safe(&p, end, snap_id, out_err);
5006         ceph_decode_64_safe(&p, end, overlap, out_err);
5007
5008         /*
5009          * The parent won't change (except when the clone is
5010          * flattened, already handled that).  So we only need to
5011          * record the parent spec we have not already done so.
5012          */
5013         if (!rbd_dev->parent_spec) {
5014                 parent_spec->pool_id = pool_id;
5015                 parent_spec->image_id = image_id;
5016                 parent_spec->snap_id = snap_id;
5017                 rbd_dev->parent_spec = parent_spec;
5018                 parent_spec = NULL;     /* rbd_dev now owns this */
5019         } else {
5020                 kfree(image_id);
5021         }
5022
5023         /*
5024          * We always update the parent overlap.  If it's zero we issue
5025          * a warning, as we will proceed as if there was no parent.
5026          */
5027         if (!overlap) {
5028                 if (parent_spec) {
5029                         /* refresh, careful to warn just once */
5030                         if (rbd_dev->parent_overlap)
5031                                 rbd_warn(rbd_dev,
5032                                     "clone now standalone (overlap became 0)");
5033                 } else {
5034                         /* initial probe */
5035                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5036                 }
5037         }
5038         rbd_dev->parent_overlap = overlap;
5039
5040 out:
5041         ret = 0;
5042 out_err:
5043         kfree(reply_buf);
5044         rbd_spec_put(parent_spec);
5045
5046         return ret;
5047 }
5048
5049 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5050 {
5051         struct {
5052                 __le64 stripe_unit;
5053                 __le64 stripe_count;
5054         } __attribute__ ((packed)) striping_info_buf = { 0 };
5055         size_t size = sizeof (striping_info_buf);
5056         void *p;
5057         u64 obj_size;
5058         u64 stripe_unit;
5059         u64 stripe_count;
5060         int ret;
5061
5062         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5063                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5064                                 NULL, 0, &striping_info_buf, size);
5065         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5066         if (ret < 0)
5067                 return ret;
5068         if (ret < size)
5069                 return -ERANGE;
5070
5071         /*
5072          * We don't actually support the "fancy striping" feature
5073          * (STRIPINGV2) yet, but if the striping sizes are the
5074          * defaults the behavior is the same as before.  So find
5075          * out, and only fail if the image has non-default values.
5076          */
5077         ret = -EINVAL;
5078         obj_size = rbd_obj_bytes(&rbd_dev->header);
5079         p = &striping_info_buf;
5080         stripe_unit = ceph_decode_64(&p);
5081         if (stripe_unit != obj_size) {
5082                 rbd_warn(rbd_dev, "unsupported stripe unit "
5083                                 "(got %llu want %llu)",
5084                                 stripe_unit, obj_size);
5085                 return -EINVAL;
5086         }
5087         stripe_count = ceph_decode_64(&p);
5088         if (stripe_count != 1) {
5089                 rbd_warn(rbd_dev, "unsupported stripe count "
5090                                 "(got %llu want 1)", stripe_count);
5091                 return -EINVAL;
5092         }
5093         rbd_dev->header.stripe_unit = stripe_unit;
5094         rbd_dev->header.stripe_count = stripe_count;
5095
5096         return 0;
5097 }
5098
5099 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5100 {
5101         __le64 data_pool_id;
5102         int ret;
5103
5104         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5105                                   &rbd_dev->header_oloc, "get_data_pool",
5106                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5107         if (ret < 0)
5108                 return ret;
5109         if (ret < sizeof(data_pool_id))
5110                 return -EBADMSG;
5111
5112         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5113         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5114         return 0;
5115 }
5116
5117 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5118 {
5119         CEPH_DEFINE_OID_ONSTACK(oid);
5120         size_t image_id_size;
5121         char *image_id;
5122         void *p;
5123         void *end;
5124         size_t size;
5125         void *reply_buf = NULL;
5126         size_t len = 0;
5127         char *image_name = NULL;
5128         int ret;
5129
5130         rbd_assert(!rbd_dev->spec->image_name);
5131
5132         len = strlen(rbd_dev->spec->image_id);
5133         image_id_size = sizeof (__le32) + len;
5134         image_id = kmalloc(image_id_size, GFP_KERNEL);
5135         if (!image_id)
5136                 return NULL;
5137
5138         p = image_id;
5139         end = image_id + image_id_size;
5140         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5141
5142         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5143         reply_buf = kmalloc(size, GFP_KERNEL);
5144         if (!reply_buf)
5145                 goto out;
5146
5147         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5148         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5149                                   "dir_get_name", image_id, image_id_size,
5150                                   reply_buf, size);
5151         if (ret < 0)
5152                 goto out;
5153         p = reply_buf;
5154         end = reply_buf + ret;
5155
5156         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5157         if (IS_ERR(image_name))
5158                 image_name = NULL;
5159         else
5160                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5161 out:
5162         kfree(reply_buf);
5163         kfree(image_id);
5164
5165         return image_name;
5166 }
5167
5168 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5169 {
5170         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5171         const char *snap_name;
5172         u32 which = 0;
5173
5174         /* Skip over names until we find the one we are looking for */
5175
5176         snap_name = rbd_dev->header.snap_names;
5177         while (which < snapc->num_snaps) {
5178                 if (!strcmp(name, snap_name))
5179                         return snapc->snaps[which];
5180                 snap_name += strlen(snap_name) + 1;
5181                 which++;
5182         }
5183         return CEPH_NOSNAP;
5184 }
5185
5186 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5187 {
5188         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5189         u32 which;
5190         bool found = false;
5191         u64 snap_id;
5192
5193         for (which = 0; !found && which < snapc->num_snaps; which++) {
5194                 const char *snap_name;
5195
5196                 snap_id = snapc->snaps[which];
5197                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5198                 if (IS_ERR(snap_name)) {
5199                         /* ignore no-longer existing snapshots */
5200                         if (PTR_ERR(snap_name) == -ENOENT)
5201                                 continue;
5202                         else
5203                                 break;
5204                 }
5205                 found = !strcmp(name, snap_name);
5206                 kfree(snap_name);
5207         }
5208         return found ? snap_id : CEPH_NOSNAP;
5209 }
5210
5211 /*
5212  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5213  * no snapshot by that name is found, or if an error occurs.
5214  */
5215 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5216 {
5217         if (rbd_dev->image_format == 1)
5218                 return rbd_v1_snap_id_by_name(rbd_dev, name);
5219
5220         return rbd_v2_snap_id_by_name(rbd_dev, name);
5221 }
5222
5223 /*
5224  * An image being mapped will have everything but the snap id.
5225  */
5226 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5227 {
5228         struct rbd_spec *spec = rbd_dev->spec;
5229
5230         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5231         rbd_assert(spec->image_id && spec->image_name);
5232         rbd_assert(spec->snap_name);
5233
5234         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5235                 u64 snap_id;
5236
5237                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5238                 if (snap_id == CEPH_NOSNAP)
5239                         return -ENOENT;
5240
5241                 spec->snap_id = snap_id;
5242         } else {
5243                 spec->snap_id = CEPH_NOSNAP;
5244         }
5245
5246         return 0;
5247 }
5248
5249 /*
5250  * A parent image will have all ids but none of the names.
5251  *
5252  * All names in an rbd spec are dynamically allocated.  It's OK if we
5253  * can't figure out the name for an image id.
5254  */
5255 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5256 {
5257         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5258         struct rbd_spec *spec = rbd_dev->spec;
5259         const char *pool_name;
5260         const char *image_name;
5261         const char *snap_name;
5262         int ret;
5263
5264         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5265         rbd_assert(spec->image_id);
5266         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5267
5268         /* Get the pool name; we have to make our own copy of this */
5269
5270         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5271         if (!pool_name) {
5272                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5273                 return -EIO;
5274         }
5275         pool_name = kstrdup(pool_name, GFP_KERNEL);
5276         if (!pool_name)
5277                 return -ENOMEM;
5278
5279         /* Fetch the image name; tolerate failure here */
5280
5281         image_name = rbd_dev_image_name(rbd_dev);
5282         if (!image_name)
5283                 rbd_warn(rbd_dev, "unable to get image name");
5284
5285         /* Fetch the snapshot name */
5286
5287         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5288         if (IS_ERR(snap_name)) {
5289                 ret = PTR_ERR(snap_name);
5290                 goto out_err;
5291         }
5292
5293         spec->pool_name = pool_name;
5294         spec->image_name = image_name;
5295         spec->snap_name = snap_name;
5296
5297         return 0;
5298
5299 out_err:
5300         kfree(image_name);
5301         kfree(pool_name);
5302         return ret;
5303 }
5304
5305 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5306 {
5307         size_t size;
5308         int ret;
5309         void *reply_buf;
5310         void *p;
5311         void *end;
5312         u64 seq;
5313         u32 snap_count;
5314         struct ceph_snap_context *snapc;
5315         u32 i;
5316
5317         /*
5318          * We'll need room for the seq value (maximum snapshot id),
5319          * snapshot count, and array of that many snapshot ids.
5320          * For now we have a fixed upper limit on the number we're
5321          * prepared to receive.
5322          */
5323         size = sizeof (__le64) + sizeof (__le32) +
5324                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5325         reply_buf = kzalloc(size, GFP_KERNEL);
5326         if (!reply_buf)
5327                 return -ENOMEM;
5328
5329         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5330                                   &rbd_dev->header_oloc, "get_snapcontext",
5331                                   NULL, 0, reply_buf, size);
5332         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5333         if (ret < 0)
5334                 goto out;
5335
5336         p = reply_buf;
5337         end = reply_buf + ret;
5338         ret = -ERANGE;
5339         ceph_decode_64_safe(&p, end, seq, out);
5340         ceph_decode_32_safe(&p, end, snap_count, out);
5341
5342         /*
5343          * Make sure the reported number of snapshot ids wouldn't go
5344          * beyond the end of our buffer.  But before checking that,
5345          * make sure the computed size of the snapshot context we
5346          * allocate is representable in a size_t.
5347          */
5348         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5349                                  / sizeof (u64)) {
5350                 ret = -EINVAL;
5351                 goto out;
5352         }
5353         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5354                 goto out;
5355         ret = 0;
5356
5357         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5358         if (!snapc) {
5359                 ret = -ENOMEM;
5360                 goto out;
5361         }
5362         snapc->seq = seq;
5363         for (i = 0; i < snap_count; i++)
5364                 snapc->snaps[i] = ceph_decode_64(&p);
5365
5366         ceph_put_snap_context(rbd_dev->header.snapc);
5367         rbd_dev->header.snapc = snapc;
5368
5369         dout("  snap context seq = %llu, snap_count = %u\n",
5370                 (unsigned long long)seq, (unsigned int)snap_count);
5371 out:
5372         kfree(reply_buf);
5373
5374         return ret;
5375 }
5376
5377 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5378                                         u64 snap_id)
5379 {
5380         size_t size;
5381         void *reply_buf;
5382         __le64 snapid;
5383         int ret;
5384         void *p;
5385         void *end;
5386         char *snap_name;
5387
5388         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5389         reply_buf = kmalloc(size, GFP_KERNEL);
5390         if (!reply_buf)
5391                 return ERR_PTR(-ENOMEM);
5392
5393         snapid = cpu_to_le64(snap_id);
5394         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5395                                   &rbd_dev->header_oloc, "get_snapshot_name",
5396                                   &snapid, sizeof(snapid), reply_buf, size);
5397         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5398         if (ret < 0) {
5399                 snap_name = ERR_PTR(ret);
5400                 goto out;
5401         }
5402
5403         p = reply_buf;
5404         end = reply_buf + ret;
5405         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5406         if (IS_ERR(snap_name))
5407                 goto out;
5408
5409         dout("  snap_id 0x%016llx snap_name = %s\n",
5410                 (unsigned long long)snap_id, snap_name);
5411 out:
5412         kfree(reply_buf);
5413
5414         return snap_name;
5415 }
5416
5417 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5418 {
5419         bool first_time = rbd_dev->header.object_prefix == NULL;
5420         int ret;
5421
5422         ret = rbd_dev_v2_image_size(rbd_dev);
5423         if (ret)
5424                 return ret;
5425
5426         if (first_time) {
5427                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5428                 if (ret)
5429                         return ret;
5430         }
5431
5432         ret = rbd_dev_v2_snap_context(rbd_dev);
5433         if (ret && first_time) {
5434                 kfree(rbd_dev->header.object_prefix);
5435                 rbd_dev->header.object_prefix = NULL;
5436         }
5437
5438         return ret;
5439 }
5440
5441 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5442 {
5443         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5444
5445         if (rbd_dev->image_format == 1)
5446                 return rbd_dev_v1_header_info(rbd_dev);
5447
5448         return rbd_dev_v2_header_info(rbd_dev);
5449 }
5450
5451 /*
5452  * Skips over white space at *buf, and updates *buf to point to the
5453  * first found non-space character (if any). Returns the length of
5454  * the token (string of non-white space characters) found.  Note
5455  * that *buf must be terminated with '\0'.
5456  */
5457 static inline size_t next_token(const char **buf)
5458 {
5459         /*
5460         * These are the characters that produce nonzero for
5461         * isspace() in the "C" and "POSIX" locales.
5462         */
5463         const char *spaces = " \f\n\r\t\v";
5464
5465         *buf += strspn(*buf, spaces);   /* Find start of token */
5466
5467         return strcspn(*buf, spaces);   /* Return token length */
5468 }
5469
5470 /*
5471  * Finds the next token in *buf, dynamically allocates a buffer big
5472  * enough to hold a copy of it, and copies the token into the new
5473  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5474  * that a duplicate buffer is created even for a zero-length token.
5475  *
5476  * Returns a pointer to the newly-allocated duplicate, or a null
5477  * pointer if memory for the duplicate was not available.  If
5478  * the lenp argument is a non-null pointer, the length of the token
5479  * (not including the '\0') is returned in *lenp.
5480  *
5481  * If successful, the *buf pointer will be updated to point beyond
5482  * the end of the found token.
5483  *
5484  * Note: uses GFP_KERNEL for allocation.
5485  */
5486 static inline char *dup_token(const char **buf, size_t *lenp)
5487 {
5488         char *dup;
5489         size_t len;
5490
5491         len = next_token(buf);
5492         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5493         if (!dup)
5494                 return NULL;
5495         *(dup + len) = '\0';
5496         *buf += len;
5497
5498         if (lenp)
5499                 *lenp = len;
5500
5501         return dup;
5502 }
5503
5504 /*
5505  * Parse the options provided for an "rbd add" (i.e., rbd image
5506  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5507  * and the data written is passed here via a NUL-terminated buffer.
5508  * Returns 0 if successful or an error code otherwise.
5509  *
5510  * The information extracted from these options is recorded in
5511  * the other parameters which return dynamically-allocated
5512  * structures:
5513  *  ceph_opts
5514  *      The address of a pointer that will refer to a ceph options
5515  *      structure.  Caller must release the returned pointer using
5516  *      ceph_destroy_options() when it is no longer needed.
5517  *  rbd_opts
5518  *      Address of an rbd options pointer.  Fully initialized by
5519  *      this function; caller must release with kfree().
5520  *  spec
5521  *      Address of an rbd image specification pointer.  Fully
5522  *      initialized by this function based on parsed options.
5523  *      Caller must release with rbd_spec_put().
5524  *
5525  * The options passed take this form:
5526  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5527  * where:
5528  *  <mon_addrs>
5529  *      A comma-separated list of one or more monitor addresses.
5530  *      A monitor address is an ip address, optionally followed
5531  *      by a port number (separated by a colon).
5532  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5533  *  <options>
5534  *      A comma-separated list of ceph and/or rbd options.
5535  *  <pool_name>
5536  *      The name of the rados pool containing the rbd image.
5537  *  <image_name>
5538  *      The name of the image in that pool to map.
5539  *  <snap_id>
5540  *      An optional snapshot id.  If provided, the mapping will
5541  *      present data from the image at the time that snapshot was
5542  *      created.  The image head is used if no snapshot id is
5543  *      provided.  Snapshot mappings are always read-only.
5544  */
5545 static int rbd_add_parse_args(const char *buf,
5546                                 struct ceph_options **ceph_opts,
5547                                 struct rbd_options **opts,
5548                                 struct rbd_spec **rbd_spec)
5549 {
5550         size_t len;
5551         char *options;
5552         const char *mon_addrs;
5553         char *snap_name;
5554         size_t mon_addrs_size;
5555         struct rbd_spec *spec = NULL;
5556         struct rbd_options *rbd_opts = NULL;
5557         struct ceph_options *copts;
5558         int ret;
5559
5560         /* The first four tokens are required */
5561
5562         len = next_token(&buf);
5563         if (!len) {
5564                 rbd_warn(NULL, "no monitor address(es) provided");
5565                 return -EINVAL;
5566         }
5567         mon_addrs = buf;
5568         mon_addrs_size = len + 1;
5569         buf += len;
5570
5571         ret = -EINVAL;
5572         options = dup_token(&buf, NULL);
5573         if (!options)
5574                 return -ENOMEM;
5575         if (!*options) {
5576                 rbd_warn(NULL, "no options provided");
5577                 goto out_err;
5578         }
5579
5580         spec = rbd_spec_alloc();
5581         if (!spec)
5582                 goto out_mem;
5583
5584         spec->pool_name = dup_token(&buf, NULL);
5585         if (!spec->pool_name)
5586                 goto out_mem;
5587         if (!*spec->pool_name) {
5588                 rbd_warn(NULL, "no pool name provided");
5589                 goto out_err;
5590         }
5591
5592         spec->image_name = dup_token(&buf, NULL);
5593         if (!spec->image_name)
5594                 goto out_mem;
5595         if (!*spec->image_name) {
5596                 rbd_warn(NULL, "no image name provided");
5597                 goto out_err;
5598         }
5599
5600         /*
5601          * Snapshot name is optional; default is to use "-"
5602          * (indicating the head/no snapshot).
5603          */
5604         len = next_token(&buf);
5605         if (!len) {
5606                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5607                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5608         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5609                 ret = -ENAMETOOLONG;
5610                 goto out_err;
5611         }
5612         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5613         if (!snap_name)
5614                 goto out_mem;
5615         *(snap_name + len) = '\0';
5616         spec->snap_name = snap_name;
5617
5618         /* Initialize all rbd options to the defaults */
5619
5620         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5621         if (!rbd_opts)
5622                 goto out_mem;
5623
5624         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5625         rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5626         rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5627
5628         copts = ceph_parse_options(options, mon_addrs,
5629                                         mon_addrs + mon_addrs_size - 1,
5630                                         parse_rbd_opts_token, rbd_opts);
5631         if (IS_ERR(copts)) {
5632                 ret = PTR_ERR(copts);
5633                 goto out_err;
5634         }
5635         kfree(options);
5636
5637         *ceph_opts = copts;
5638         *opts = rbd_opts;
5639         *rbd_spec = spec;
5640
5641         return 0;
5642 out_mem:
5643         ret = -ENOMEM;
5644 out_err:
5645         kfree(rbd_opts);
5646         rbd_spec_put(spec);
5647         kfree(options);
5648
5649         return ret;
5650 }
5651
5652 /*
5653  * Return pool id (>= 0) or a negative error code.
5654  */
5655 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5656 {
5657         struct ceph_options *opts = rbdc->client->options;
5658         u64 newest_epoch;
5659         int tries = 0;
5660         int ret;
5661
5662 again:
5663         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5664         if (ret == -ENOENT && tries++ < 1) {
5665                 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5666                                             &newest_epoch);
5667                 if (ret < 0)
5668                         return ret;
5669
5670                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5671                         ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5672                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5673                                                      newest_epoch,
5674                                                      opts->mount_timeout);
5675                         goto again;
5676                 } else {
5677                         /* the osdmap we have is new enough */
5678                         return -ENOENT;
5679                 }
5680         }
5681
5682         return ret;
5683 }
5684
5685 /*
5686  * An rbd format 2 image has a unique identifier, distinct from the
5687  * name given to it by the user.  Internally, that identifier is
5688  * what's used to specify the names of objects related to the image.
5689  *
5690  * A special "rbd id" object is used to map an rbd image name to its
5691  * id.  If that object doesn't exist, then there is no v2 rbd image
5692  * with the supplied name.
5693  *
5694  * This function will record the given rbd_dev's image_id field if
5695  * it can be determined, and in that case will return 0.  If any
5696  * errors occur a negative errno will be returned and the rbd_dev's
5697  * image_id field will be unchanged (and should be NULL).
5698  */
5699 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5700 {
5701         int ret;
5702         size_t size;
5703         CEPH_DEFINE_OID_ONSTACK(oid);
5704         void *response;
5705         char *image_id;
5706
5707         /*
5708          * When probing a parent image, the image id is already
5709          * known (and the image name likely is not).  There's no
5710          * need to fetch the image id again in this case.  We
5711          * do still need to set the image format though.
5712          */
5713         if (rbd_dev->spec->image_id) {
5714                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5715
5716                 return 0;
5717         }
5718
5719         /*
5720          * First, see if the format 2 image id file exists, and if
5721          * so, get the image's persistent id from it.
5722          */
5723         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5724                                rbd_dev->spec->image_name);
5725         if (ret)
5726                 return ret;
5727
5728         dout("rbd id object name is %s\n", oid.name);
5729
5730         /* Response will be an encoded string, which includes a length */
5731
5732         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5733         response = kzalloc(size, GFP_NOIO);
5734         if (!response) {
5735                 ret = -ENOMEM;
5736                 goto out;
5737         }
5738
5739         /* If it doesn't exist we'll assume it's a format 1 image */
5740
5741         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5742                                   "get_id", NULL, 0,
5743                                   response, RBD_IMAGE_ID_LEN_MAX);
5744         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5745         if (ret == -ENOENT) {
5746                 image_id = kstrdup("", GFP_KERNEL);
5747                 ret = image_id ? 0 : -ENOMEM;
5748                 if (!ret)
5749                         rbd_dev->image_format = 1;
5750         } else if (ret >= 0) {
5751                 void *p = response;
5752
5753                 image_id = ceph_extract_encoded_string(&p, p + ret,
5754                                                 NULL, GFP_NOIO);
5755                 ret = PTR_ERR_OR_ZERO(image_id);
5756                 if (!ret)
5757                         rbd_dev->image_format = 2;
5758         }
5759
5760         if (!ret) {
5761                 rbd_dev->spec->image_id = image_id;
5762                 dout("image_id is %s\n", image_id);
5763         }
5764 out:
5765         kfree(response);
5766         ceph_oid_destroy(&oid);
5767         return ret;
5768 }
5769
5770 /*
5771  * Undo whatever state changes are made by v1 or v2 header info
5772  * call.
5773  */
5774 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5775 {
5776         struct rbd_image_header *header;
5777
5778         rbd_dev_parent_put(rbd_dev);
5779
5780         /* Free dynamic fields from the header, then zero it out */
5781
5782         header = &rbd_dev->header;
5783         ceph_put_snap_context(header->snapc);
5784         kfree(header->snap_sizes);
5785         kfree(header->snap_names);
5786         kfree(header->object_prefix);
5787         memset(header, 0, sizeof (*header));
5788 }
5789
5790 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5791 {
5792         int ret;
5793
5794         ret = rbd_dev_v2_object_prefix(rbd_dev);
5795         if (ret)
5796                 goto out_err;
5797
5798         /*
5799          * Get the and check features for the image.  Currently the
5800          * features are assumed to never change.
5801          */
5802         ret = rbd_dev_v2_features(rbd_dev);
5803         if (ret)
5804                 goto out_err;
5805
5806         /* If the image supports fancy striping, get its parameters */
5807
5808         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5809                 ret = rbd_dev_v2_striping_info(rbd_dev);
5810                 if (ret < 0)
5811                         goto out_err;
5812         }
5813
5814         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5815                 ret = rbd_dev_v2_data_pool(rbd_dev);
5816                 if (ret)
5817                         goto out_err;
5818         }
5819
5820         rbd_init_layout(rbd_dev);
5821         return 0;
5822
5823 out_err:
5824         rbd_dev->header.features = 0;
5825         kfree(rbd_dev->header.object_prefix);
5826         rbd_dev->header.object_prefix = NULL;
5827         return ret;
5828 }
5829
5830 /*
5831  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5832  * rbd_dev_image_probe() recursion depth, which means it's also the
5833  * length of the already discovered part of the parent chain.
5834  */
5835 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5836 {
5837         struct rbd_device *parent = NULL;
5838         int ret;
5839
5840         if (!rbd_dev->parent_spec)
5841                 return 0;
5842
5843         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5844                 pr_info("parent chain is too long (%d)\n", depth);
5845                 ret = -EINVAL;
5846                 goto out_err;
5847         }
5848
5849         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5850         if (!parent) {
5851                 ret = -ENOMEM;
5852                 goto out_err;
5853         }
5854
5855         /*
5856          * Images related by parent/child relationships always share
5857          * rbd_client and spec/parent_spec, so bump their refcounts.
5858          */
5859         __rbd_get_client(rbd_dev->rbd_client);
5860         rbd_spec_get(rbd_dev->parent_spec);
5861
5862         ret = rbd_dev_image_probe(parent, depth);
5863         if (ret < 0)
5864                 goto out_err;
5865
5866         rbd_dev->parent = parent;
5867         atomic_set(&rbd_dev->parent_ref, 1);
5868         return 0;
5869
5870 out_err:
5871         rbd_dev_unparent(rbd_dev);
5872         rbd_dev_destroy(parent);
5873         return ret;
5874 }
5875
5876 /*
5877  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5878  * upon return.
5879  */
5880 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5881 {
5882         int ret;
5883
5884         /* Record our major and minor device numbers. */
5885
5886         if (!single_major) {
5887                 ret = register_blkdev(0, rbd_dev->name);
5888                 if (ret < 0)
5889                         goto err_out_unlock;
5890
5891                 rbd_dev->major = ret;
5892                 rbd_dev->minor = 0;
5893         } else {
5894                 rbd_dev->major = rbd_major;
5895                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5896         }
5897
5898         /* Set up the blkdev mapping. */
5899
5900         ret = rbd_init_disk(rbd_dev);
5901         if (ret)
5902                 goto err_out_blkdev;
5903
5904         ret = rbd_dev_mapping_set(rbd_dev);
5905         if (ret)
5906                 goto err_out_disk;
5907
5908         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5909         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5910
5911         dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5912         ret = device_add(&rbd_dev->dev);
5913         if (ret)
5914                 goto err_out_mapping;
5915
5916         /* Everything's ready.  Announce the disk to the world. */
5917
5918         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5919         up_write(&rbd_dev->header_rwsem);
5920
5921         spin_lock(&rbd_dev_list_lock);
5922         list_add_tail(&rbd_dev->node, &rbd_dev_list);
5923         spin_unlock(&rbd_dev_list_lock);
5924
5925         add_disk(rbd_dev->disk);
5926         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5927                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5928                 rbd_dev->header.features);
5929
5930         return ret;
5931
5932 err_out_mapping:
5933         rbd_dev_mapping_clear(rbd_dev);
5934 err_out_disk:
5935         rbd_free_disk(rbd_dev);
5936 err_out_blkdev:
5937         if (!single_major)
5938                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5939 err_out_unlock:
5940         up_write(&rbd_dev->header_rwsem);
5941         return ret;
5942 }
5943
5944 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5945 {
5946         struct rbd_spec *spec = rbd_dev->spec;
5947         int ret;
5948
5949         /* Record the header object name for this rbd image. */
5950
5951         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5952         if (rbd_dev->image_format == 1)
5953                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5954                                        spec->image_name, RBD_SUFFIX);
5955         else
5956                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5957                                        RBD_HEADER_PREFIX, spec->image_id);
5958
5959         return ret;
5960 }
5961
5962 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5963 {
5964         rbd_dev_unprobe(rbd_dev);
5965         rbd_dev->image_format = 0;
5966         kfree(rbd_dev->spec->image_id);
5967         rbd_dev->spec->image_id = NULL;
5968
5969         rbd_dev_destroy(rbd_dev);
5970 }
5971
5972 /*
5973  * Probe for the existence of the header object for the given rbd
5974  * device.  If this image is the one being mapped (i.e., not a
5975  * parent), initiate a watch on its header object before using that
5976  * object to get detailed information about the rbd image.
5977  */
5978 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5979 {
5980         int ret;
5981
5982         /*
5983          * Get the id from the image id object.  Unless there's an
5984          * error, rbd_dev->spec->image_id will be filled in with
5985          * a dynamically-allocated string, and rbd_dev->image_format
5986          * will be set to either 1 or 2.
5987          */
5988         ret = rbd_dev_image_id(rbd_dev);
5989         if (ret)
5990                 return ret;
5991
5992         ret = rbd_dev_header_name(rbd_dev);
5993         if (ret)
5994                 goto err_out_format;
5995
5996         if (!depth) {
5997                 ret = rbd_register_watch(rbd_dev);
5998                 if (ret) {
5999                         if (ret == -ENOENT)
6000                                 pr_info("image %s/%s does not exist\n",
6001                                         rbd_dev->spec->pool_name,
6002                                         rbd_dev->spec->image_name);
6003                         goto err_out_format;
6004                 }
6005         }
6006
6007         ret = rbd_dev_header_info(rbd_dev);
6008         if (ret)
6009                 goto err_out_watch;
6010
6011         /*
6012          * If this image is the one being mapped, we have pool name and
6013          * id, image name and id, and snap name - need to fill snap id.
6014          * Otherwise this is a parent image, identified by pool, image
6015          * and snap ids - need to fill in names for those ids.
6016          */
6017         if (!depth)
6018                 ret = rbd_spec_fill_snap_id(rbd_dev);
6019         else
6020                 ret = rbd_spec_fill_names(rbd_dev);
6021         if (ret) {
6022                 if (ret == -ENOENT)
6023                         pr_info("snap %s/%s@%s does not exist\n",
6024                                 rbd_dev->spec->pool_name,
6025                                 rbd_dev->spec->image_name,
6026                                 rbd_dev->spec->snap_name);
6027                 goto err_out_probe;
6028         }
6029
6030         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6031                 ret = rbd_dev_v2_parent_info(rbd_dev);
6032                 if (ret)
6033                         goto err_out_probe;
6034
6035                 /*
6036                  * Need to warn users if this image is the one being
6037                  * mapped and has a parent.
6038                  */
6039                 if (!depth && rbd_dev->parent_spec)
6040                         rbd_warn(rbd_dev,
6041                                  "WARNING: kernel layering is EXPERIMENTAL!");
6042         }
6043
6044         ret = rbd_dev_probe_parent(rbd_dev, depth);
6045         if (ret)
6046                 goto err_out_probe;
6047
6048         dout("discovered format %u image, header name is %s\n",
6049                 rbd_dev->image_format, rbd_dev->header_oid.name);
6050         return 0;
6051
6052 err_out_probe:
6053         rbd_dev_unprobe(rbd_dev);
6054 err_out_watch:
6055         if (!depth)
6056                 rbd_unregister_watch(rbd_dev);
6057 err_out_format:
6058         rbd_dev->image_format = 0;
6059         kfree(rbd_dev->spec->image_id);
6060         rbd_dev->spec->image_id = NULL;
6061         return ret;
6062 }
6063
6064 static ssize_t do_rbd_add(struct bus_type *bus,
6065                           const char *buf,
6066                           size_t count)
6067 {
6068         struct rbd_device *rbd_dev = NULL;
6069         struct ceph_options *ceph_opts = NULL;
6070         struct rbd_options *rbd_opts = NULL;
6071         struct rbd_spec *spec = NULL;
6072         struct rbd_client *rbdc;
6073         bool read_only;
6074         int rc;
6075
6076         if (!try_module_get(THIS_MODULE))
6077                 return -ENODEV;
6078
6079         /* parse add command */
6080         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6081         if (rc < 0)
6082                 goto out;
6083
6084         rbdc = rbd_get_client(ceph_opts);
6085         if (IS_ERR(rbdc)) {
6086                 rc = PTR_ERR(rbdc);
6087                 goto err_out_args;
6088         }
6089
6090         /* pick the pool */
6091         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6092         if (rc < 0) {
6093                 if (rc == -ENOENT)
6094                         pr_info("pool %s does not exist\n", spec->pool_name);
6095                 goto err_out_client;
6096         }
6097         spec->pool_id = (u64)rc;
6098
6099         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6100         if (!rbd_dev) {
6101                 rc = -ENOMEM;
6102                 goto err_out_client;
6103         }
6104         rbdc = NULL;            /* rbd_dev now owns this */
6105         spec = NULL;            /* rbd_dev now owns this */
6106         rbd_opts = NULL;        /* rbd_dev now owns this */
6107
6108         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6109         if (!rbd_dev->config_info) {
6110                 rc = -ENOMEM;
6111                 goto err_out_rbd_dev;
6112         }
6113
6114         down_write(&rbd_dev->header_rwsem);
6115         rc = rbd_dev_image_probe(rbd_dev, 0);
6116         if (rc < 0) {
6117                 up_write(&rbd_dev->header_rwsem);
6118                 goto err_out_rbd_dev;
6119         }
6120
6121         /* If we are mapping a snapshot it must be marked read-only */
6122
6123         read_only = rbd_dev->opts->read_only;
6124         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6125                 read_only = true;
6126         rbd_dev->mapping.read_only = read_only;
6127
6128         rc = rbd_dev_device_setup(rbd_dev);
6129         if (rc) {
6130                 /*
6131                  * rbd_unregister_watch() can't be moved into
6132                  * rbd_dev_image_release() without refactoring, see
6133                  * commit 1f3ef78861ac.
6134                  */
6135                 rbd_unregister_watch(rbd_dev);
6136                 rbd_dev_image_release(rbd_dev);
6137                 goto out;
6138         }
6139
6140         rc = count;
6141 out:
6142         module_put(THIS_MODULE);
6143         return rc;
6144
6145 err_out_rbd_dev:
6146         rbd_dev_destroy(rbd_dev);
6147 err_out_client:
6148         rbd_put_client(rbdc);
6149 err_out_args:
6150         rbd_spec_put(spec);
6151         kfree(rbd_opts);
6152         goto out;
6153 }
6154
6155 static ssize_t rbd_add(struct bus_type *bus,
6156                        const char *buf,
6157                        size_t count)
6158 {
6159         if (single_major)
6160                 return -EINVAL;
6161
6162         return do_rbd_add(bus, buf, count);
6163 }
6164
6165 static ssize_t rbd_add_single_major(struct bus_type *bus,
6166                                     const char *buf,
6167                                     size_t count)
6168 {
6169         return do_rbd_add(bus, buf, count);
6170 }
6171
6172 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6173 {
6174         rbd_free_disk(rbd_dev);
6175
6176         spin_lock(&rbd_dev_list_lock);
6177         list_del_init(&rbd_dev->node);
6178         spin_unlock(&rbd_dev_list_lock);
6179
6180         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6181         device_del(&rbd_dev->dev);
6182         rbd_dev_mapping_clear(rbd_dev);
6183         if (!single_major)
6184                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6185 }
6186
6187 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6188 {
6189         while (rbd_dev->parent) {
6190                 struct rbd_device *first = rbd_dev;
6191                 struct rbd_device *second = first->parent;
6192                 struct rbd_device *third;
6193
6194                 /*
6195                  * Follow to the parent with no grandparent and
6196                  * remove it.
6197                  */
6198                 while (second && (third = second->parent)) {
6199                         first = second;
6200                         second = third;
6201                 }
6202                 rbd_assert(second);
6203                 rbd_dev_image_release(second);
6204                 first->parent = NULL;
6205                 first->parent_overlap = 0;
6206
6207                 rbd_assert(first->parent_spec);
6208                 rbd_spec_put(first->parent_spec);
6209                 first->parent_spec = NULL;
6210         }
6211 }
6212
6213 static ssize_t do_rbd_remove(struct bus_type *bus,
6214                              const char *buf,
6215                              size_t count)
6216 {
6217         struct rbd_device *rbd_dev = NULL;
6218         struct list_head *tmp;
6219         int dev_id;
6220         char opt_buf[6];
6221         bool already = false;
6222         bool force = false;
6223         int ret;
6224
6225         dev_id = -1;
6226         opt_buf[0] = '\0';
6227         sscanf(buf, "%d %5s", &dev_id, opt_buf);
6228         if (dev_id < 0) {
6229                 pr_err("dev_id out of range\n");
6230                 return -EINVAL;
6231         }
6232         if (opt_buf[0] != '\0') {
6233                 if (!strcmp(opt_buf, "force")) {
6234                         force = true;
6235                 } else {
6236                         pr_err("bad remove option at '%s'\n", opt_buf);
6237                         return -EINVAL;
6238                 }
6239         }
6240
6241         ret = -ENOENT;
6242         spin_lock(&rbd_dev_list_lock);
6243         list_for_each(tmp, &rbd_dev_list) {
6244                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6245                 if (rbd_dev->dev_id == dev_id) {
6246                         ret = 0;
6247                         break;
6248                 }
6249         }
6250         if (!ret) {
6251                 spin_lock_irq(&rbd_dev->lock);
6252                 if (rbd_dev->open_count && !force)
6253                         ret = -EBUSY;
6254                 else
6255                         already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6256                                                         &rbd_dev->flags);
6257                 spin_unlock_irq(&rbd_dev->lock);
6258         }
6259         spin_unlock(&rbd_dev_list_lock);
6260         if (ret < 0 || already)
6261                 return ret;
6262
6263         if (force) {
6264                 /*
6265                  * Prevent new IO from being queued and wait for existing
6266                  * IO to complete/fail.
6267                  */
6268                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6269                 blk_set_queue_dying(rbd_dev->disk->queue);
6270         }
6271
6272         down_write(&rbd_dev->lock_rwsem);
6273         if (__rbd_is_lock_owner(rbd_dev))
6274                 rbd_unlock(rbd_dev);
6275         up_write(&rbd_dev->lock_rwsem);
6276         rbd_unregister_watch(rbd_dev);
6277
6278         /*
6279          * Don't free anything from rbd_dev->disk until after all
6280          * notifies are completely processed. Otherwise
6281          * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6282          * in a potential use after free of rbd_dev->disk or rbd_dev.
6283          */
6284         rbd_dev_device_release(rbd_dev);
6285         rbd_dev_image_release(rbd_dev);
6286
6287         return count;
6288 }
6289
6290 static ssize_t rbd_remove(struct bus_type *bus,
6291                           const char *buf,
6292                           size_t count)
6293 {
6294         if (single_major)
6295                 return -EINVAL;
6296
6297         return do_rbd_remove(bus, buf, count);
6298 }
6299
6300 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6301                                        const char *buf,
6302                                        size_t count)
6303 {
6304         return do_rbd_remove(bus, buf, count);
6305 }
6306
6307 /*
6308  * create control files in sysfs
6309  * /sys/bus/rbd/...
6310  */
6311 static int rbd_sysfs_init(void)
6312 {
6313         int ret;
6314
6315         ret = device_register(&rbd_root_dev);
6316         if (ret < 0)
6317                 return ret;
6318
6319         ret = bus_register(&rbd_bus_type);
6320         if (ret < 0)
6321                 device_unregister(&rbd_root_dev);
6322
6323         return ret;
6324 }
6325
6326 static void rbd_sysfs_cleanup(void)
6327 {
6328         bus_unregister(&rbd_bus_type);
6329         device_unregister(&rbd_root_dev);
6330 }
6331
6332 static int rbd_slab_init(void)
6333 {
6334         rbd_assert(!rbd_img_request_cache);
6335         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6336         if (!rbd_img_request_cache)
6337                 return -ENOMEM;
6338
6339         rbd_assert(!rbd_obj_request_cache);
6340         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6341         if (!rbd_obj_request_cache)
6342                 goto out_err;
6343
6344         return 0;
6345
6346 out_err:
6347         kmem_cache_destroy(rbd_img_request_cache);
6348         rbd_img_request_cache = NULL;
6349         return -ENOMEM;
6350 }
6351
6352 static void rbd_slab_exit(void)
6353 {
6354         rbd_assert(rbd_obj_request_cache);
6355         kmem_cache_destroy(rbd_obj_request_cache);
6356         rbd_obj_request_cache = NULL;
6357
6358         rbd_assert(rbd_img_request_cache);
6359         kmem_cache_destroy(rbd_img_request_cache);
6360         rbd_img_request_cache = NULL;
6361 }
6362
6363 static int __init rbd_init(void)
6364 {
6365         int rc;
6366
6367         if (!libceph_compatible(NULL)) {
6368                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6369                 return -EINVAL;
6370         }
6371
6372         rc = rbd_slab_init();
6373         if (rc)
6374                 return rc;
6375
6376         /*
6377          * The number of active work items is limited by the number of
6378          * rbd devices * queue depth, so leave @max_active at default.
6379          */
6380         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6381         if (!rbd_wq) {
6382                 rc = -ENOMEM;
6383                 goto err_out_slab;
6384         }
6385
6386         if (single_major) {
6387                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6388                 if (rbd_major < 0) {
6389                         rc = rbd_major;
6390                         goto err_out_wq;
6391                 }
6392         }
6393
6394         rc = rbd_sysfs_init();
6395         if (rc)
6396                 goto err_out_blkdev;
6397
6398         if (single_major)
6399                 pr_info("loaded (major %d)\n", rbd_major);
6400         else
6401                 pr_info("loaded\n");
6402
6403         return 0;
6404
6405 err_out_blkdev:
6406         if (single_major)
6407                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6408 err_out_wq:
6409         destroy_workqueue(rbd_wq);
6410 err_out_slab:
6411         rbd_slab_exit();
6412         return rc;
6413 }
6414
6415 static void __exit rbd_exit(void)
6416 {
6417         ida_destroy(&rbd_dev_id_ida);
6418         rbd_sysfs_cleanup();
6419         if (single_major)
6420                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6421         destroy_workqueue(rbd_wq);
6422         rbd_slab_exit();
6423 }
6424
6425 module_init(rbd_init);
6426 module_exit(rbd_exit);
6427
6428 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6429 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6430 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6431 /* following authorship retained from original osdblk.c */
6432 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6433
6434 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6435 MODULE_LICENSE("GPL");