]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
Merge branch 'for-3.10/core' of git://git.kernel.dk/linux-block
[karo-tx-linux.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These four fields never change for a given rbd image */
104         char *object_prefix;
105         u64 features;
106         __u8 obj_order;
107         __u8 crypt_type;
108         __u8 comp_type;
109
110         /* The remaining fields need to be updated occasionally */
111         u64 image_size;
112         struct ceph_snap_context *snapc;
113         char *snap_names;
114         u64 *snap_sizes;
115
116         u64 stripe_unit;
117         u64 stripe_count;
118 };
119
120 /*
121  * An rbd image specification.
122  *
123  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124  * identify an image.  Each rbd_dev structure includes a pointer to
125  * an rbd_spec structure that encapsulates this identity.
126  *
127  * Each of the id's in an rbd_spec has an associated name.  For a
128  * user-mapped image, the names are supplied and the id's associated
129  * with them are looked up.  For a layered image, a parent image is
130  * defined by the tuple, and the names are looked up.
131  *
132  * An rbd_dev structure contains a parent_spec pointer which is
133  * non-null if the image it represents is a child in a layered
134  * image.  This pointer will refer to the rbd_spec structure used
135  * by the parent rbd_dev for its own identity (i.e., the structure
136  * is shared between the parent and child).
137  *
138  * Since these structures are populated once, during the discovery
139  * phase of image construction, they are effectively immutable so
140  * we make no effort to synchronize access to them.
141  *
142  * Note that code herein does not assume the image name is known (it
143  * could be a null pointer).
144  */
145 struct rbd_spec {
146         u64             pool_id;
147         const char      *pool_name;
148
149         const char      *image_id;
150         const char      *image_name;
151
152         u64             snap_id;
153         const char      *snap_name;
154
155         struct kref     kref;
156 };
157
158 /*
159  * an instance of the client.  multiple devices may share an rbd client.
160  */
161 struct rbd_client {
162         struct ceph_client      *client;
163         struct kref             kref;
164         struct list_head        node;
165 };
166
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169
170 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
171
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174
175 enum obj_request_type {
176         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177 };
178
179 enum obj_req_flags {
180         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
181         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
182         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
183         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
184 };
185
186 struct rbd_obj_request {
187         const char              *object_name;
188         u64                     offset;         /* object start byte */
189         u64                     length;         /* bytes from offset */
190         unsigned long           flags;
191
192         /*
193          * An object request associated with an image will have its
194          * img_data flag set; a standalone object request will not.
195          *
196          * A standalone object request will have which == BAD_WHICH
197          * and a null obj_request pointer.
198          *
199          * An object request initiated in support of a layered image
200          * object (to check for its existence before a write) will
201          * have which == BAD_WHICH and a non-null obj_request pointer.
202          *
203          * Finally, an object request for rbd image data will have
204          * which != BAD_WHICH, and will have a non-null img_request
205          * pointer.  The value of which will be in the range
206          * 0..(img_request->obj_request_count-1).
207          */
208         union {
209                 struct rbd_obj_request  *obj_request;   /* STAT op */
210                 struct {
211                         struct rbd_img_request  *img_request;
212                         u64                     img_offset;
213                         /* links for img_request->obj_requests list */
214                         struct list_head        links;
215                 };
216         };
217         u32                     which;          /* posn image request list */
218
219         enum obj_request_type   type;
220         union {
221                 struct bio      *bio_list;
222                 struct {
223                         struct page     **pages;
224                         u32             page_count;
225                 };
226         };
227         struct page             **copyup_pages;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         spinlock_t              completion_lock;/* protects next_completion */
261         u32                     next_completion;
262         rbd_img_callback_t      callback;
263         u64                     xferred;/* aggregate bytes transferred */
264         int                     result; /* first nonzero obj_request result */
265
266         u32                     obj_request_count;
267         struct list_head        obj_requests;   /* rbd_obj_request structs */
268
269         struct kref             kref;
270 };
271
272 #define for_each_obj_request(ireq, oreq) \
273         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
278
279 struct rbd_mapping {
280         u64                     size;
281         u64                     features;
282         bool                    read_only;
283 };
284
285 /*
286  * a single device
287  */
288 struct rbd_device {
289         int                     dev_id;         /* blkdev unique id */
290
291         int                     major;          /* blkdev assigned major */
292         struct gendisk          *disk;          /* blkdev's gendisk and rq */
293
294         u32                     image_format;   /* Either 1 or 2 */
295         struct rbd_client       *rbd_client;
296
297         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298
299         spinlock_t              lock;           /* queue, flags, open_count */
300
301         struct rbd_image_header header;
302         unsigned long           flags;          /* possibly lock protected */
303         struct rbd_spec         *spec;
304
305         char                    *header_name;
306
307         struct ceph_file_layout layout;
308
309         struct ceph_osd_event   *watch_event;
310         struct rbd_obj_request  *watch_request;
311
312         struct rbd_spec         *parent_spec;
313         u64                     parent_overlap;
314         struct rbd_device       *parent;
315
316         /* protects updating the header */
317         struct rw_semaphore     header_rwsem;
318
319         struct rbd_mapping      mapping;
320
321         struct list_head        node;
322
323         /* sysfs related */
324         struct device           dev;
325         unsigned long           open_count;     /* protected by lock */
326 };
327
328 /*
329  * Flag bits for rbd_dev->flags.  If atomicity is required,
330  * rbd_dev->lock is used to protect access.
331  *
332  * Currently, only the "removing" flag (which is coupled with the
333  * "open_count" field) requires atomic access.
334  */
335 enum rbd_dev_flags {
336         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
337         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
338 };
339
340 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
341
342 static LIST_HEAD(rbd_dev_list);    /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344
345 static LIST_HEAD(rbd_client_list);              /* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
347
348 /* Slab caches for frequently-allocated structures */
349
350 static struct kmem_cache        *rbd_img_request_cache;
351 static struct kmem_cache        *rbd_obj_request_cache;
352 static struct kmem_cache        *rbd_segment_name_cache;
353
354 static int rbd_img_request_submit(struct rbd_img_request *img_request);
355
356 static void rbd_dev_device_release(struct device *dev);
357
358 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359                        size_t count);
360 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361                           size_t count);
362 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
363
364 static struct bus_attribute rbd_bus_attrs[] = {
365         __ATTR(add, S_IWUSR, NULL, rbd_add),
366         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
367         __ATTR_NULL
368 };
369
370 static struct bus_type rbd_bus_type = {
371         .name           = "rbd",
372         .bus_attrs      = rbd_bus_attrs,
373 };
374
375 static void rbd_root_dev_release(struct device *dev)
376 {
377 }
378
379 static struct device rbd_root_dev = {
380         .init_name =    "rbd",
381         .release =      rbd_root_dev_release,
382 };
383
384 static __printf(2, 3)
385 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
386 {
387         struct va_format vaf;
388         va_list args;
389
390         va_start(args, fmt);
391         vaf.fmt = fmt;
392         vaf.va = &args;
393
394         if (!rbd_dev)
395                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
396         else if (rbd_dev->disk)
397                 printk(KERN_WARNING "%s: %s: %pV\n",
398                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
399         else if (rbd_dev->spec && rbd_dev->spec->image_name)
400                 printk(KERN_WARNING "%s: image %s: %pV\n",
401                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
402         else if (rbd_dev->spec && rbd_dev->spec->image_id)
403                 printk(KERN_WARNING "%s: id %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
405         else    /* punt */
406                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
407                         RBD_DRV_NAME, rbd_dev, &vaf);
408         va_end(args);
409 }
410
411 #ifdef RBD_DEBUG
412 #define rbd_assert(expr)                                                \
413                 if (unlikely(!(expr))) {                                \
414                         printk(KERN_ERR "\nAssertion failure in %s() "  \
415                                                 "at line %d:\n\n"       \
416                                         "\trbd_assert(%s);\n\n",        \
417                                         __func__, __LINE__, #expr);     \
418                         BUG();                                          \
419                 }
420 #else /* !RBD_DEBUG */
421 #  define rbd_assert(expr)      ((void) 0)
422 #endif /* !RBD_DEBUG */
423
424 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
427
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431                                         u64 snap_id);
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433                                 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435                 u64 *snap_features);
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
437
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 {
440         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441         bool removing = false;
442
443         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
444                 return -EROFS;
445
446         spin_lock_irq(&rbd_dev->lock);
447         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448                 removing = true;
449         else
450                 rbd_dev->open_count++;
451         spin_unlock_irq(&rbd_dev->lock);
452         if (removing)
453                 return -ENOENT;
454
455         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456         (void) get_device(&rbd_dev->dev);
457         set_device_ro(bdev, rbd_dev->mapping.read_only);
458         mutex_unlock(&ctl_mutex);
459
460         return 0;
461 }
462
463 static void rbd_release(struct gendisk *disk, fmode_t mode)
464 {
465         struct rbd_device *rbd_dev = disk->private_data;
466         unsigned long open_count_before;
467
468         spin_lock_irq(&rbd_dev->lock);
469         open_count_before = rbd_dev->open_count--;
470         spin_unlock_irq(&rbd_dev->lock);
471         rbd_assert(open_count_before > 0);
472
473         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474         put_device(&rbd_dev->dev);
475         mutex_unlock(&ctl_mutex);
476 }
477
478 static const struct block_device_operations rbd_bd_ops = {
479         .owner                  = THIS_MODULE,
480         .open                   = rbd_open,
481         .release                = rbd_release,
482 };
483
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *rbdc;
491         int ret = -ENOMEM;
492
493         dout("%s:\n", __func__);
494         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495         if (!rbdc)
496                 goto out_opt;
497
498         kref_init(&rbdc->kref);
499         INIT_LIST_HEAD(&rbdc->node);
500
501         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504         if (IS_ERR(rbdc->client))
505                 goto out_mutex;
506         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508         ret = ceph_open_session(rbdc->client);
509         if (ret < 0)
510                 goto out_err;
511
512         spin_lock(&rbd_client_list_lock);
513         list_add_tail(&rbdc->node, &rbd_client_list);
514         spin_unlock(&rbd_client_list_lock);
515
516         mutex_unlock(&ctl_mutex);
517         dout("%s: rbdc %p\n", __func__, rbdc);
518
519         return rbdc;
520
521 out_err:
522         ceph_destroy_client(rbdc->client);
523 out_mutex:
524         mutex_unlock(&ctl_mutex);
525         kfree(rbdc);
526 out_opt:
527         if (ceph_opts)
528                 ceph_destroy_options(ceph_opts);
529         dout("%s: error %d\n", __func__, ret);
530
531         return ERR_PTR(ret);
532 }
533
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536         kref_get(&rbdc->kref);
537
538         return rbdc;
539 }
540
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547         struct rbd_client *client_node;
548         bool found = false;
549
550         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551                 return NULL;
552
553         spin_lock(&rbd_client_list_lock);
554         list_for_each_entry(client_node, &rbd_client_list, node) {
555                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556                         __rbd_get_client(client_node);
557
558                         found = true;
559                         break;
560                 }
561         }
562         spin_unlock(&rbd_client_list_lock);
563
564         return found ? client_node : NULL;
565 }
566
567 /*
568  * mount options
569  */
570 enum {
571         Opt_last_int,
572         /* int args above */
573         Opt_last_string,
574         /* string args above */
575         Opt_read_only,
576         Opt_read_write,
577         /* Boolean args above */
578         Opt_last_bool,
579 };
580
581 static match_table_t rbd_opts_tokens = {
582         /* int args above */
583         /* string args above */
584         {Opt_read_only, "read_only"},
585         {Opt_read_only, "ro"},          /* Alternate spelling */
586         {Opt_read_write, "read_write"},
587         {Opt_read_write, "rw"},         /* Alternate spelling */
588         /* Boolean args above */
589         {-1, NULL}
590 };
591
592 struct rbd_options {
593         bool    read_only;
594 };
595
596 #define RBD_READ_ONLY_DEFAULT   false
597
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600         struct rbd_options *rbd_opts = private;
601         substring_t argstr[MAX_OPT_ARGS];
602         int token, intval, ret;
603
604         token = match_token(c, rbd_opts_tokens, argstr);
605         if (token < 0)
606                 return -EINVAL;
607
608         if (token < Opt_last_int) {
609                 ret = match_int(&argstr[0], &intval);
610                 if (ret < 0) {
611                         pr_err("bad mount option arg (not int) "
612                                "at '%s'\n", c);
613                         return ret;
614                 }
615                 dout("got int token %d val %d\n", token, intval);
616         } else if (token > Opt_last_int && token < Opt_last_string) {
617                 dout("got string token %d val %s\n", token,
618                      argstr[0].from);
619         } else if (token > Opt_last_string && token < Opt_last_bool) {
620                 dout("got Boolean token %d\n", token);
621         } else {
622                 dout("got token %d\n", token);
623         }
624
625         switch (token) {
626         case Opt_read_only:
627                 rbd_opts->read_only = true;
628                 break;
629         case Opt_read_write:
630                 rbd_opts->read_only = false;
631                 break;
632         default:
633                 rbd_assert(false);
634                 break;
635         }
636         return 0;
637 }
638
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645         struct rbd_client *rbdc;
646
647         rbdc = rbd_client_find(ceph_opts);
648         if (rbdc)       /* using an existing client */
649                 ceph_destroy_options(ceph_opts);
650         else
651                 rbdc = rbd_client_create(ceph_opts);
652
653         return rbdc;
654 }
655
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665         dout("%s: rbdc %p\n", __func__, rbdc);
666         spin_lock(&rbd_client_list_lock);
667         list_del(&rbdc->node);
668         spin_unlock(&rbd_client_list_lock);
669
670         ceph_destroy_client(rbdc->client);
671         kfree(rbdc);
672 }
673
674 /*
675  * Drop reference to ceph client node. If it's not referenced anymore, release
676  * it.
677  */
678 static void rbd_put_client(struct rbd_client *rbdc)
679 {
680         if (rbdc)
681                 kref_put(&rbdc->kref, rbd_client_release);
682 }
683
684 static bool rbd_image_format_valid(u32 image_format)
685 {
686         return image_format == 1 || image_format == 2;
687 }
688
689 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
690 {
691         size_t size;
692         u32 snap_count;
693
694         /* The header has to start with the magic rbd header text */
695         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
696                 return false;
697
698         /* The bio layer requires at least sector-sized I/O */
699
700         if (ondisk->options.order < SECTOR_SHIFT)
701                 return false;
702
703         /* If we use u64 in a few spots we may be able to loosen this */
704
705         if (ondisk->options.order > 8 * sizeof (int) - 1)
706                 return false;
707
708         /*
709          * The size of a snapshot header has to fit in a size_t, and
710          * that limits the number of snapshots.
711          */
712         snap_count = le32_to_cpu(ondisk->snap_count);
713         size = SIZE_MAX - sizeof (struct ceph_snap_context);
714         if (snap_count > size / sizeof (__le64))
715                 return false;
716
717         /*
718          * Not only that, but the size of the entire the snapshot
719          * header must also be representable in a size_t.
720          */
721         size -= snap_count * sizeof (__le64);
722         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
723                 return false;
724
725         return true;
726 }
727
728 /*
729  * Create a new header structure, translate header format from the on-disk
730  * header.
731  */
732 static int rbd_header_from_disk(struct rbd_image_header *header,
733                                  struct rbd_image_header_ondisk *ondisk)
734 {
735         u32 snap_count;
736         size_t len;
737         size_t size;
738         u32 i;
739
740         memset(header, 0, sizeof (*header));
741
742         snap_count = le32_to_cpu(ondisk->snap_count);
743
744         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
746         if (!header->object_prefix)
747                 return -ENOMEM;
748         memcpy(header->object_prefix, ondisk->object_prefix, len);
749         header->object_prefix[len] = '\0';
750
751         if (snap_count) {
752                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753
754                 /* Save a copy of the snapshot names */
755
756                 if (snap_names_len > (u64) SIZE_MAX)
757                         return -EIO;
758                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759                 if (!header->snap_names)
760                         goto out_err;
761                 /*
762                  * Note that rbd_dev_v1_header_read() guarantees
763                  * the ondisk buffer we're working with has
764                  * snap_names_len bytes beyond the end of the
765                  * snapshot id array, this memcpy() is safe.
766                  */
767                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
768                         snap_names_len);
769
770                 /* Record each snapshot's size */
771
772                 size = snap_count * sizeof (*header->snap_sizes);
773                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
774                 if (!header->snap_sizes)
775                         goto out_err;
776                 for (i = 0; i < snap_count; i++)
777                         header->snap_sizes[i] =
778                                 le64_to_cpu(ondisk->snaps[i].image_size);
779         } else {
780                 header->snap_names = NULL;
781                 header->snap_sizes = NULL;
782         }
783
784         header->features = 0;   /* No features support in v1 images */
785         header->obj_order = ondisk->options.order;
786         header->crypt_type = ondisk->options.crypt_type;
787         header->comp_type = ondisk->options.comp_type;
788
789         /* Allocate and fill in the snapshot context */
790
791         header->image_size = le64_to_cpu(ondisk->image_size);
792
793         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
794         if (!header->snapc)
795                 goto out_err;
796         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
797         for (i = 0; i < snap_count; i++)
798                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
799
800         return 0;
801
802 out_err:
803         kfree(header->snap_sizes);
804         header->snap_sizes = NULL;
805         kfree(header->snap_names);
806         header->snap_names = NULL;
807         kfree(header->object_prefix);
808         header->object_prefix = NULL;
809
810         return -ENOMEM;
811 }
812
813 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
814 {
815         const char *snap_name;
816
817         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
818
819         /* Skip over names until we find the one we are looking for */
820
821         snap_name = rbd_dev->header.snap_names;
822         while (which--)
823                 snap_name += strlen(snap_name) + 1;
824
825         return kstrdup(snap_name, GFP_KERNEL);
826 }
827
828 /*
829  * Snapshot id comparison function for use with qsort()/bsearch().
830  * Note that result is for snapshots in *descending* order.
831  */
832 static int snapid_compare_reverse(const void *s1, const void *s2)
833 {
834         u64 snap_id1 = *(u64 *)s1;
835         u64 snap_id2 = *(u64 *)s2;
836
837         if (snap_id1 < snap_id2)
838                 return 1;
839         return snap_id1 == snap_id2 ? 0 : -1;
840 }
841
842 /*
843  * Search a snapshot context to see if the given snapshot id is
844  * present.
845  *
846  * Returns the position of the snapshot id in the array if it's found,
847  * or BAD_SNAP_INDEX otherwise.
848  *
849  * Note: The snapshot array is in kept sorted (by the osd) in
850  * reverse order, highest snapshot id first.
851  */
852 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
853 {
854         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
855         u64 *found;
856
857         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
858                                 sizeof (snap_id), snapid_compare_reverse);
859
860         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
861 }
862
863 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
864                                         u64 snap_id)
865 {
866         u32 which;
867
868         which = rbd_dev_snap_index(rbd_dev, snap_id);
869         if (which == BAD_SNAP_INDEX)
870                 return NULL;
871
872         return _rbd_dev_v1_snap_name(rbd_dev, which);
873 }
874
875 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
876 {
877         if (snap_id == CEPH_NOSNAP)
878                 return RBD_SNAP_HEAD_NAME;
879
880         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
881         if (rbd_dev->image_format == 1)
882                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
883
884         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
885 }
886
887 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
888                                 u64 *snap_size)
889 {
890         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
891         if (snap_id == CEPH_NOSNAP) {
892                 *snap_size = rbd_dev->header.image_size;
893         } else if (rbd_dev->image_format == 1) {
894                 u32 which;
895
896                 which = rbd_dev_snap_index(rbd_dev, snap_id);
897                 if (which == BAD_SNAP_INDEX)
898                         return -ENOENT;
899
900                 *snap_size = rbd_dev->header.snap_sizes[which];
901         } else {
902                 u64 size = 0;
903                 int ret;
904
905                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
906                 if (ret)
907                         return ret;
908
909                 *snap_size = size;
910         }
911         return 0;
912 }
913
914 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
915                         u64 *snap_features)
916 {
917         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
918         if (snap_id == CEPH_NOSNAP) {
919                 *snap_features = rbd_dev->header.features;
920         } else if (rbd_dev->image_format == 1) {
921                 *snap_features = 0;     /* No features for format 1 */
922         } else {
923                 u64 features = 0;
924                 int ret;
925
926                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
927                 if (ret)
928                         return ret;
929
930                 *snap_features = features;
931         }
932         return 0;
933 }
934
935 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
936 {
937         const char *snap_name = rbd_dev->spec->snap_name;
938         u64 snap_id;
939         u64 size = 0;
940         u64 features = 0;
941         int ret;
942
943         if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
944                 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
945                 if (snap_id == CEPH_NOSNAP)
946                         return -ENOENT;
947         } else {
948                 snap_id = CEPH_NOSNAP;
949         }
950
951         ret = rbd_snap_size(rbd_dev, snap_id, &size);
952         if (ret)
953                 return ret;
954         ret = rbd_snap_features(rbd_dev, snap_id, &features);
955         if (ret)
956                 return ret;
957
958         rbd_dev->mapping.size = size;
959         rbd_dev->mapping.features = features;
960
961         /* If we are mapping a snapshot it must be marked read-only */
962
963         if (snap_id != CEPH_NOSNAP)
964                 rbd_dev->mapping.read_only = true;
965
966         return 0;
967 }
968
969 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
970 {
971         rbd_dev->mapping.size = 0;
972         rbd_dev->mapping.features = 0;
973         rbd_dev->mapping.read_only = true;
974 }
975
976 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
977 {
978         rbd_dev->mapping.size = 0;
979         rbd_dev->mapping.features = 0;
980         rbd_dev->mapping.read_only = true;
981 }
982
983 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
984 {
985         char *name;
986         u64 segment;
987         int ret;
988
989         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
990         if (!name)
991                 return NULL;
992         segment = offset >> rbd_dev->header.obj_order;
993         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
994                         rbd_dev->header.object_prefix, segment);
995         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
996                 pr_err("error formatting segment name for #%llu (%d)\n",
997                         segment, ret);
998                 kfree(name);
999                 name = NULL;
1000         }
1001
1002         return name;
1003 }
1004
1005 static void rbd_segment_name_free(const char *name)
1006 {
1007         /* The explicit cast here is needed to drop the const qualifier */
1008
1009         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1010 }
1011
1012 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1013 {
1014         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1015
1016         return offset & (segment_size - 1);
1017 }
1018
1019 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1020                                 u64 offset, u64 length)
1021 {
1022         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1023
1024         offset &= segment_size - 1;
1025
1026         rbd_assert(length <= U64_MAX - offset);
1027         if (offset + length > segment_size)
1028                 length = segment_size - offset;
1029
1030         return length;
1031 }
1032
1033 /*
1034  * returns the size of an object in the image
1035  */
1036 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1037 {
1038         return 1 << header->obj_order;
1039 }
1040
1041 /*
1042  * bio helpers
1043  */
1044
1045 static void bio_chain_put(struct bio *chain)
1046 {
1047         struct bio *tmp;
1048
1049         while (chain) {
1050                 tmp = chain;
1051                 chain = chain->bi_next;
1052                 bio_put(tmp);
1053         }
1054 }
1055
1056 /*
1057  * zeros a bio chain, starting at specific offset
1058  */
1059 static void zero_bio_chain(struct bio *chain, int start_ofs)
1060 {
1061         struct bio_vec *bv;
1062         unsigned long flags;
1063         void *buf;
1064         int i;
1065         int pos = 0;
1066
1067         while (chain) {
1068                 bio_for_each_segment(bv, chain, i) {
1069                         if (pos + bv->bv_len > start_ofs) {
1070                                 int remainder = max(start_ofs - pos, 0);
1071                                 buf = bvec_kmap_irq(bv, &flags);
1072                                 memset(buf + remainder, 0,
1073                                        bv->bv_len - remainder);
1074                                 bvec_kunmap_irq(buf, &flags);
1075                         }
1076                         pos += bv->bv_len;
1077                 }
1078
1079                 chain = chain->bi_next;
1080         }
1081 }
1082
1083 /*
1084  * similar to zero_bio_chain(), zeros data defined by a page array,
1085  * starting at the given byte offset from the start of the array and
1086  * continuing up to the given end offset.  The pages array is
1087  * assumed to be big enough to hold all bytes up to the end.
1088  */
1089 static void zero_pages(struct page **pages, u64 offset, u64 end)
1090 {
1091         struct page **page = &pages[offset >> PAGE_SHIFT];
1092
1093         rbd_assert(end > offset);
1094         rbd_assert(end - offset <= (u64)SIZE_MAX);
1095         while (offset < end) {
1096                 size_t page_offset;
1097                 size_t length;
1098                 unsigned long flags;
1099                 void *kaddr;
1100
1101                 page_offset = (size_t)(offset & ~PAGE_MASK);
1102                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1103                 local_irq_save(flags);
1104                 kaddr = kmap_atomic(*page);
1105                 memset(kaddr + page_offset, 0, length);
1106                 kunmap_atomic(kaddr);
1107                 local_irq_restore(flags);
1108
1109                 offset += length;
1110                 page++;
1111         }
1112 }
1113
1114 /*
1115  * Clone a portion of a bio, starting at the given byte offset
1116  * and continuing for the number of bytes indicated.
1117  */
1118 static struct bio *bio_clone_range(struct bio *bio_src,
1119                                         unsigned int offset,
1120                                         unsigned int len,
1121                                         gfp_t gfpmask)
1122 {
1123         struct bio_vec *bv;
1124         unsigned int resid;
1125         unsigned short idx;
1126         unsigned int voff;
1127         unsigned short end_idx;
1128         unsigned short vcnt;
1129         struct bio *bio;
1130
1131         /* Handle the easy case for the caller */
1132
1133         if (!offset && len == bio_src->bi_size)
1134                 return bio_clone(bio_src, gfpmask);
1135
1136         if (WARN_ON_ONCE(!len))
1137                 return NULL;
1138         if (WARN_ON_ONCE(len > bio_src->bi_size))
1139                 return NULL;
1140         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1141                 return NULL;
1142
1143         /* Find first affected segment... */
1144
1145         resid = offset;
1146         bio_for_each_segment(bv, bio_src, idx) {
1147                 if (resid < bv->bv_len)
1148                         break;
1149                 resid -= bv->bv_len;
1150         }
1151         voff = resid;
1152
1153         /* ...and the last affected segment */
1154
1155         resid += len;
1156         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1157                 if (resid <= bv->bv_len)
1158                         break;
1159                 resid -= bv->bv_len;
1160         }
1161         vcnt = end_idx - idx + 1;
1162
1163         /* Build the clone */
1164
1165         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1166         if (!bio)
1167                 return NULL;    /* ENOMEM */
1168
1169         bio->bi_bdev = bio_src->bi_bdev;
1170         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1171         bio->bi_rw = bio_src->bi_rw;
1172         bio->bi_flags |= 1 << BIO_CLONED;
1173
1174         /*
1175          * Copy over our part of the bio_vec, then update the first
1176          * and last (or only) entries.
1177          */
1178         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1179                         vcnt * sizeof (struct bio_vec));
1180         bio->bi_io_vec[0].bv_offset += voff;
1181         if (vcnt > 1) {
1182                 bio->bi_io_vec[0].bv_len -= voff;
1183                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1184         } else {
1185                 bio->bi_io_vec[0].bv_len = len;
1186         }
1187
1188         bio->bi_vcnt = vcnt;
1189         bio->bi_size = len;
1190         bio->bi_idx = 0;
1191
1192         return bio;
1193 }
1194
1195 /*
1196  * Clone a portion of a bio chain, starting at the given byte offset
1197  * into the first bio in the source chain and continuing for the
1198  * number of bytes indicated.  The result is another bio chain of
1199  * exactly the given length, or a null pointer on error.
1200  *
1201  * The bio_src and offset parameters are both in-out.  On entry they
1202  * refer to the first source bio and the offset into that bio where
1203  * the start of data to be cloned is located.
1204  *
1205  * On return, bio_src is updated to refer to the bio in the source
1206  * chain that contains first un-cloned byte, and *offset will
1207  * contain the offset of that byte within that bio.
1208  */
1209 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1210                                         unsigned int *offset,
1211                                         unsigned int len,
1212                                         gfp_t gfpmask)
1213 {
1214         struct bio *bi = *bio_src;
1215         unsigned int off = *offset;
1216         struct bio *chain = NULL;
1217         struct bio **end;
1218
1219         /* Build up a chain of clone bios up to the limit */
1220
1221         if (!bi || off >= bi->bi_size || !len)
1222                 return NULL;            /* Nothing to clone */
1223
1224         end = &chain;
1225         while (len) {
1226                 unsigned int bi_size;
1227                 struct bio *bio;
1228
1229                 if (!bi) {
1230                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1231                         goto out_err;   /* EINVAL; ran out of bio's */
1232                 }
1233                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1234                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1235                 if (!bio)
1236                         goto out_err;   /* ENOMEM */
1237
1238                 *end = bio;
1239                 end = &bio->bi_next;
1240
1241                 off += bi_size;
1242                 if (off == bi->bi_size) {
1243                         bi = bi->bi_next;
1244                         off = 0;
1245                 }
1246                 len -= bi_size;
1247         }
1248         *bio_src = bi;
1249         *offset = off;
1250
1251         return chain;
1252 out_err:
1253         bio_chain_put(chain);
1254
1255         return NULL;
1256 }
1257
1258 /*
1259  * The default/initial value for all object request flags is 0.  For
1260  * each flag, once its value is set to 1 it is never reset to 0
1261  * again.
1262  */
1263 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1264 {
1265         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1266                 struct rbd_device *rbd_dev;
1267
1268                 rbd_dev = obj_request->img_request->rbd_dev;
1269                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1270                         obj_request);
1271         }
1272 }
1273
1274 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1275 {
1276         smp_mb();
1277         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1278 }
1279
1280 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1281 {
1282         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1283                 struct rbd_device *rbd_dev = NULL;
1284
1285                 if (obj_request_img_data_test(obj_request))
1286                         rbd_dev = obj_request->img_request->rbd_dev;
1287                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1288                         obj_request);
1289         }
1290 }
1291
1292 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1293 {
1294         smp_mb();
1295         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1296 }
1297
1298 /*
1299  * This sets the KNOWN flag after (possibly) setting the EXISTS
1300  * flag.  The latter is set based on the "exists" value provided.
1301  *
1302  * Note that for our purposes once an object exists it never goes
1303  * away again.  It's possible that the response from two existence
1304  * checks are separated by the creation of the target object, and
1305  * the first ("doesn't exist") response arrives *after* the second
1306  * ("does exist").  In that case we ignore the second one.
1307  */
1308 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1309                                 bool exists)
1310 {
1311         if (exists)
1312                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1313         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1314         smp_mb();
1315 }
1316
1317 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1318 {
1319         smp_mb();
1320         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1321 }
1322
1323 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1324 {
1325         smp_mb();
1326         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1327 }
1328
1329 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1330 {
1331         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1332                 atomic_read(&obj_request->kref.refcount));
1333         kref_get(&obj_request->kref);
1334 }
1335
1336 static void rbd_obj_request_destroy(struct kref *kref);
1337 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1338 {
1339         rbd_assert(obj_request != NULL);
1340         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1341                 atomic_read(&obj_request->kref.refcount));
1342         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1343 }
1344
1345 static void rbd_img_request_get(struct rbd_img_request *img_request)
1346 {
1347         dout("%s: img %p (was %d)\n", __func__, img_request,
1348                 atomic_read(&img_request->kref.refcount));
1349         kref_get(&img_request->kref);
1350 }
1351
1352 static void rbd_img_request_destroy(struct kref *kref);
1353 static void rbd_img_request_put(struct rbd_img_request *img_request)
1354 {
1355         rbd_assert(img_request != NULL);
1356         dout("%s: img %p (was %d)\n", __func__, img_request,
1357                 atomic_read(&img_request->kref.refcount));
1358         kref_put(&img_request->kref, rbd_img_request_destroy);
1359 }
1360
1361 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1362                                         struct rbd_obj_request *obj_request)
1363 {
1364         rbd_assert(obj_request->img_request == NULL);
1365
1366         /* Image request now owns object's original reference */
1367         obj_request->img_request = img_request;
1368         obj_request->which = img_request->obj_request_count;
1369         rbd_assert(!obj_request_img_data_test(obj_request));
1370         obj_request_img_data_set(obj_request);
1371         rbd_assert(obj_request->which != BAD_WHICH);
1372         img_request->obj_request_count++;
1373         list_add_tail(&obj_request->links, &img_request->obj_requests);
1374         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1375                 obj_request->which);
1376 }
1377
1378 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1379                                         struct rbd_obj_request *obj_request)
1380 {
1381         rbd_assert(obj_request->which != BAD_WHICH);
1382
1383         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1384                 obj_request->which);
1385         list_del(&obj_request->links);
1386         rbd_assert(img_request->obj_request_count > 0);
1387         img_request->obj_request_count--;
1388         rbd_assert(obj_request->which == img_request->obj_request_count);
1389         obj_request->which = BAD_WHICH;
1390         rbd_assert(obj_request_img_data_test(obj_request));
1391         rbd_assert(obj_request->img_request == img_request);
1392         obj_request->img_request = NULL;
1393         obj_request->callback = NULL;
1394         rbd_obj_request_put(obj_request);
1395 }
1396
1397 static bool obj_request_type_valid(enum obj_request_type type)
1398 {
1399         switch (type) {
1400         case OBJ_REQUEST_NODATA:
1401         case OBJ_REQUEST_BIO:
1402         case OBJ_REQUEST_PAGES:
1403                 return true;
1404         default:
1405                 return false;
1406         }
1407 }
1408
1409 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1410                                 struct rbd_obj_request *obj_request)
1411 {
1412         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1413
1414         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1415 }
1416
1417 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1418 {
1419
1420         dout("%s: img %p\n", __func__, img_request);
1421
1422         /*
1423          * If no error occurred, compute the aggregate transfer
1424          * count for the image request.  We could instead use
1425          * atomic64_cmpxchg() to update it as each object request
1426          * completes; not clear which way is better off hand.
1427          */
1428         if (!img_request->result) {
1429                 struct rbd_obj_request *obj_request;
1430                 u64 xferred = 0;
1431
1432                 for_each_obj_request(img_request, obj_request)
1433                         xferred += obj_request->xferred;
1434                 img_request->xferred = xferred;
1435         }
1436
1437         if (img_request->callback)
1438                 img_request->callback(img_request);
1439         else
1440                 rbd_img_request_put(img_request);
1441 }
1442
1443 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1444
1445 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1446 {
1447         dout("%s: obj %p\n", __func__, obj_request);
1448
1449         return wait_for_completion_interruptible(&obj_request->completion);
1450 }
1451
1452 /*
1453  * The default/initial value for all image request flags is 0.  Each
1454  * is conditionally set to 1 at image request initialization time
1455  * and currently never change thereafter.
1456  */
1457 static void img_request_write_set(struct rbd_img_request *img_request)
1458 {
1459         set_bit(IMG_REQ_WRITE, &img_request->flags);
1460         smp_mb();
1461 }
1462
1463 static bool img_request_write_test(struct rbd_img_request *img_request)
1464 {
1465         smp_mb();
1466         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1467 }
1468
1469 static void img_request_child_set(struct rbd_img_request *img_request)
1470 {
1471         set_bit(IMG_REQ_CHILD, &img_request->flags);
1472         smp_mb();
1473 }
1474
1475 static bool img_request_child_test(struct rbd_img_request *img_request)
1476 {
1477         smp_mb();
1478         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1479 }
1480
1481 static void img_request_layered_set(struct rbd_img_request *img_request)
1482 {
1483         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1484         smp_mb();
1485 }
1486
1487 static bool img_request_layered_test(struct rbd_img_request *img_request)
1488 {
1489         smp_mb();
1490         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1491 }
1492
1493 static void
1494 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1495 {
1496         u64 xferred = obj_request->xferred;
1497         u64 length = obj_request->length;
1498
1499         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1500                 obj_request, obj_request->img_request, obj_request->result,
1501                 xferred, length);
1502         /*
1503          * ENOENT means a hole in the image.  We zero-fill the
1504          * entire length of the request.  A short read also implies
1505          * zero-fill to the end of the request.  Either way we
1506          * update the xferred count to indicate the whole request
1507          * was satisfied.
1508          */
1509         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1510         if (obj_request->result == -ENOENT) {
1511                 if (obj_request->type == OBJ_REQUEST_BIO)
1512                         zero_bio_chain(obj_request->bio_list, 0);
1513                 else
1514                         zero_pages(obj_request->pages, 0, length);
1515                 obj_request->result = 0;
1516                 obj_request->xferred = length;
1517         } else if (xferred < length && !obj_request->result) {
1518                 if (obj_request->type == OBJ_REQUEST_BIO)
1519                         zero_bio_chain(obj_request->bio_list, xferred);
1520                 else
1521                         zero_pages(obj_request->pages, xferred, length);
1522                 obj_request->xferred = length;
1523         }
1524         obj_request_done_set(obj_request);
1525 }
1526
1527 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1528 {
1529         dout("%s: obj %p cb %p\n", __func__, obj_request,
1530                 obj_request->callback);
1531         if (obj_request->callback)
1532                 obj_request->callback(obj_request);
1533         else
1534                 complete_all(&obj_request->completion);
1535 }
1536
1537 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1538 {
1539         dout("%s: obj %p\n", __func__, obj_request);
1540         obj_request_done_set(obj_request);
1541 }
1542
1543 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1544 {
1545         struct rbd_img_request *img_request = NULL;
1546         struct rbd_device *rbd_dev = NULL;
1547         bool layered = false;
1548
1549         if (obj_request_img_data_test(obj_request)) {
1550                 img_request = obj_request->img_request;
1551                 layered = img_request && img_request_layered_test(img_request);
1552                 rbd_dev = img_request->rbd_dev;
1553         }
1554
1555         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1556                 obj_request, img_request, obj_request->result,
1557                 obj_request->xferred, obj_request->length);
1558         if (layered && obj_request->result == -ENOENT &&
1559                         obj_request->img_offset < rbd_dev->parent_overlap)
1560                 rbd_img_parent_read(obj_request);
1561         else if (img_request)
1562                 rbd_img_obj_request_read_callback(obj_request);
1563         else
1564                 obj_request_done_set(obj_request);
1565 }
1566
1567 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1568 {
1569         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1570                 obj_request->result, obj_request->length);
1571         /*
1572          * There is no such thing as a successful short write.  Set
1573          * it to our originally-requested length.
1574          */
1575         obj_request->xferred = obj_request->length;
1576         obj_request_done_set(obj_request);
1577 }
1578
1579 /*
1580  * For a simple stat call there's nothing to do.  We'll do more if
1581  * this is part of a write sequence for a layered image.
1582  */
1583 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1584 {
1585         dout("%s: obj %p\n", __func__, obj_request);
1586         obj_request_done_set(obj_request);
1587 }
1588
1589 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1590                                 struct ceph_msg *msg)
1591 {
1592         struct rbd_obj_request *obj_request = osd_req->r_priv;
1593         u16 opcode;
1594
1595         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1596         rbd_assert(osd_req == obj_request->osd_req);
1597         if (obj_request_img_data_test(obj_request)) {
1598                 rbd_assert(obj_request->img_request);
1599                 rbd_assert(obj_request->which != BAD_WHICH);
1600         } else {
1601                 rbd_assert(obj_request->which == BAD_WHICH);
1602         }
1603
1604         if (osd_req->r_result < 0)
1605                 obj_request->result = osd_req->r_result;
1606
1607         BUG_ON(osd_req->r_num_ops > 2);
1608
1609         /*
1610          * We support a 64-bit length, but ultimately it has to be
1611          * passed to blk_end_request(), which takes an unsigned int.
1612          */
1613         obj_request->xferred = osd_req->r_reply_op_len[0];
1614         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1615         opcode = osd_req->r_ops[0].op;
1616         switch (opcode) {
1617         case CEPH_OSD_OP_READ:
1618                 rbd_osd_read_callback(obj_request);
1619                 break;
1620         case CEPH_OSD_OP_WRITE:
1621                 rbd_osd_write_callback(obj_request);
1622                 break;
1623         case CEPH_OSD_OP_STAT:
1624                 rbd_osd_stat_callback(obj_request);
1625                 break;
1626         case CEPH_OSD_OP_CALL:
1627         case CEPH_OSD_OP_NOTIFY_ACK:
1628         case CEPH_OSD_OP_WATCH:
1629                 rbd_osd_trivial_callback(obj_request);
1630                 break;
1631         default:
1632                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1633                         obj_request->object_name, (unsigned short) opcode);
1634                 break;
1635         }
1636
1637         if (obj_request_done_test(obj_request))
1638                 rbd_obj_request_complete(obj_request);
1639 }
1640
1641 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1642 {
1643         struct rbd_img_request *img_request = obj_request->img_request;
1644         struct ceph_osd_request *osd_req = obj_request->osd_req;
1645         u64 snap_id;
1646
1647         rbd_assert(osd_req != NULL);
1648
1649         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1650         ceph_osdc_build_request(osd_req, obj_request->offset,
1651                         NULL, snap_id, NULL);
1652 }
1653
1654 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1655 {
1656         struct rbd_img_request *img_request = obj_request->img_request;
1657         struct ceph_osd_request *osd_req = obj_request->osd_req;
1658         struct ceph_snap_context *snapc;
1659         struct timespec mtime = CURRENT_TIME;
1660
1661         rbd_assert(osd_req != NULL);
1662
1663         snapc = img_request ? img_request->snapc : NULL;
1664         ceph_osdc_build_request(osd_req, obj_request->offset,
1665                         snapc, CEPH_NOSNAP, &mtime);
1666 }
1667
1668 static struct ceph_osd_request *rbd_osd_req_create(
1669                                         struct rbd_device *rbd_dev,
1670                                         bool write_request,
1671                                         struct rbd_obj_request *obj_request)
1672 {
1673         struct ceph_snap_context *snapc = NULL;
1674         struct ceph_osd_client *osdc;
1675         struct ceph_osd_request *osd_req;
1676
1677         if (obj_request_img_data_test(obj_request)) {
1678                 struct rbd_img_request *img_request = obj_request->img_request;
1679
1680                 rbd_assert(write_request ==
1681                                 img_request_write_test(img_request));
1682                 if (write_request)
1683                         snapc = img_request->snapc;
1684         }
1685
1686         /* Allocate and initialize the request, for the single op */
1687
1688         osdc = &rbd_dev->rbd_client->client->osdc;
1689         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1690         if (!osd_req)
1691                 return NULL;    /* ENOMEM */
1692
1693         if (write_request)
1694                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1695         else
1696                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1697
1698         osd_req->r_callback = rbd_osd_req_callback;
1699         osd_req->r_priv = obj_request;
1700
1701         osd_req->r_oid_len = strlen(obj_request->object_name);
1702         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1703         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1704
1705         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1706
1707         return osd_req;
1708 }
1709
1710 /*
1711  * Create a copyup osd request based on the information in the
1712  * object request supplied.  A copyup request has two osd ops,
1713  * a copyup method call, and a "normal" write request.
1714  */
1715 static struct ceph_osd_request *
1716 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1717 {
1718         struct rbd_img_request *img_request;
1719         struct ceph_snap_context *snapc;
1720         struct rbd_device *rbd_dev;
1721         struct ceph_osd_client *osdc;
1722         struct ceph_osd_request *osd_req;
1723
1724         rbd_assert(obj_request_img_data_test(obj_request));
1725         img_request = obj_request->img_request;
1726         rbd_assert(img_request);
1727         rbd_assert(img_request_write_test(img_request));
1728
1729         /* Allocate and initialize the request, for the two ops */
1730
1731         snapc = img_request->snapc;
1732         rbd_dev = img_request->rbd_dev;
1733         osdc = &rbd_dev->rbd_client->client->osdc;
1734         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1735         if (!osd_req)
1736                 return NULL;    /* ENOMEM */
1737
1738         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1739         osd_req->r_callback = rbd_osd_req_callback;
1740         osd_req->r_priv = obj_request;
1741
1742         osd_req->r_oid_len = strlen(obj_request->object_name);
1743         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1744         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1745
1746         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1747
1748         return osd_req;
1749 }
1750
1751
1752 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1753 {
1754         ceph_osdc_put_request(osd_req);
1755 }
1756
1757 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1758
1759 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1760                                                 u64 offset, u64 length,
1761                                                 enum obj_request_type type)
1762 {
1763         struct rbd_obj_request *obj_request;
1764         size_t size;
1765         char *name;
1766
1767         rbd_assert(obj_request_type_valid(type));
1768
1769         size = strlen(object_name) + 1;
1770         name = kmalloc(size, GFP_KERNEL);
1771         if (!name)
1772                 return NULL;
1773
1774         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1775         if (!obj_request) {
1776                 kfree(name);
1777                 return NULL;
1778         }
1779
1780         obj_request->object_name = memcpy(name, object_name, size);
1781         obj_request->offset = offset;
1782         obj_request->length = length;
1783         obj_request->flags = 0;
1784         obj_request->which = BAD_WHICH;
1785         obj_request->type = type;
1786         INIT_LIST_HEAD(&obj_request->links);
1787         init_completion(&obj_request->completion);
1788         kref_init(&obj_request->kref);
1789
1790         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1791                 offset, length, (int)type, obj_request);
1792
1793         return obj_request;
1794 }
1795
1796 static void rbd_obj_request_destroy(struct kref *kref)
1797 {
1798         struct rbd_obj_request *obj_request;
1799
1800         obj_request = container_of(kref, struct rbd_obj_request, kref);
1801
1802         dout("%s: obj %p\n", __func__, obj_request);
1803
1804         rbd_assert(obj_request->img_request == NULL);
1805         rbd_assert(obj_request->which == BAD_WHICH);
1806
1807         if (obj_request->osd_req)
1808                 rbd_osd_req_destroy(obj_request->osd_req);
1809
1810         rbd_assert(obj_request_type_valid(obj_request->type));
1811         switch (obj_request->type) {
1812         case OBJ_REQUEST_NODATA:
1813                 break;          /* Nothing to do */
1814         case OBJ_REQUEST_BIO:
1815                 if (obj_request->bio_list)
1816                         bio_chain_put(obj_request->bio_list);
1817                 break;
1818         case OBJ_REQUEST_PAGES:
1819                 if (obj_request->pages)
1820                         ceph_release_page_vector(obj_request->pages,
1821                                                 obj_request->page_count);
1822                 break;
1823         }
1824
1825         kfree(obj_request->object_name);
1826         obj_request->object_name = NULL;
1827         kmem_cache_free(rbd_obj_request_cache, obj_request);
1828 }
1829
1830 /*
1831  * Caller is responsible for filling in the list of object requests
1832  * that comprises the image request, and the Linux request pointer
1833  * (if there is one).
1834  */
1835 static struct rbd_img_request *rbd_img_request_create(
1836                                         struct rbd_device *rbd_dev,
1837                                         u64 offset, u64 length,
1838                                         bool write_request,
1839                                         bool child_request)
1840 {
1841         struct rbd_img_request *img_request;
1842
1843         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1844         if (!img_request)
1845                 return NULL;
1846
1847         if (write_request) {
1848                 down_read(&rbd_dev->header_rwsem);
1849                 ceph_get_snap_context(rbd_dev->header.snapc);
1850                 up_read(&rbd_dev->header_rwsem);
1851         }
1852
1853         img_request->rq = NULL;
1854         img_request->rbd_dev = rbd_dev;
1855         img_request->offset = offset;
1856         img_request->length = length;
1857         img_request->flags = 0;
1858         if (write_request) {
1859                 img_request_write_set(img_request);
1860                 img_request->snapc = rbd_dev->header.snapc;
1861         } else {
1862                 img_request->snap_id = rbd_dev->spec->snap_id;
1863         }
1864         if (child_request)
1865                 img_request_child_set(img_request);
1866         if (rbd_dev->parent_spec)
1867                 img_request_layered_set(img_request);
1868         spin_lock_init(&img_request->completion_lock);
1869         img_request->next_completion = 0;
1870         img_request->callback = NULL;
1871         img_request->result = 0;
1872         img_request->obj_request_count = 0;
1873         INIT_LIST_HEAD(&img_request->obj_requests);
1874         kref_init(&img_request->kref);
1875
1876         rbd_img_request_get(img_request);       /* Avoid a warning */
1877         rbd_img_request_put(img_request);       /* TEMPORARY */
1878
1879         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1880                 write_request ? "write" : "read", offset, length,
1881                 img_request);
1882
1883         return img_request;
1884 }
1885
1886 static void rbd_img_request_destroy(struct kref *kref)
1887 {
1888         struct rbd_img_request *img_request;
1889         struct rbd_obj_request *obj_request;
1890         struct rbd_obj_request *next_obj_request;
1891
1892         img_request = container_of(kref, struct rbd_img_request, kref);
1893
1894         dout("%s: img %p\n", __func__, img_request);
1895
1896         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1897                 rbd_img_obj_request_del(img_request, obj_request);
1898         rbd_assert(img_request->obj_request_count == 0);
1899
1900         if (img_request_write_test(img_request))
1901                 ceph_put_snap_context(img_request->snapc);
1902
1903         if (img_request_child_test(img_request))
1904                 rbd_obj_request_put(img_request->obj_request);
1905
1906         kmem_cache_free(rbd_img_request_cache, img_request);
1907 }
1908
1909 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1910 {
1911         struct rbd_img_request *img_request;
1912         unsigned int xferred;
1913         int result;
1914         bool more;
1915
1916         rbd_assert(obj_request_img_data_test(obj_request));
1917         img_request = obj_request->img_request;
1918
1919         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1920         xferred = (unsigned int)obj_request->xferred;
1921         result = obj_request->result;
1922         if (result) {
1923                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1924
1925                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1926                         img_request_write_test(img_request) ? "write" : "read",
1927                         obj_request->length, obj_request->img_offset,
1928                         obj_request->offset);
1929                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1930                         result, xferred);
1931                 if (!img_request->result)
1932                         img_request->result = result;
1933         }
1934
1935         /* Image object requests don't own their page array */
1936
1937         if (obj_request->type == OBJ_REQUEST_PAGES) {
1938                 obj_request->pages = NULL;
1939                 obj_request->page_count = 0;
1940         }
1941
1942         if (img_request_child_test(img_request)) {
1943                 rbd_assert(img_request->obj_request != NULL);
1944                 more = obj_request->which < img_request->obj_request_count - 1;
1945         } else {
1946                 rbd_assert(img_request->rq != NULL);
1947                 more = blk_end_request(img_request->rq, result, xferred);
1948         }
1949
1950         return more;
1951 }
1952
1953 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1954 {
1955         struct rbd_img_request *img_request;
1956         u32 which = obj_request->which;
1957         bool more = true;
1958
1959         rbd_assert(obj_request_img_data_test(obj_request));
1960         img_request = obj_request->img_request;
1961
1962         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1963         rbd_assert(img_request != NULL);
1964         rbd_assert(img_request->obj_request_count > 0);
1965         rbd_assert(which != BAD_WHICH);
1966         rbd_assert(which < img_request->obj_request_count);
1967         rbd_assert(which >= img_request->next_completion);
1968
1969         spin_lock_irq(&img_request->completion_lock);
1970         if (which != img_request->next_completion)
1971                 goto out;
1972
1973         for_each_obj_request_from(img_request, obj_request) {
1974                 rbd_assert(more);
1975                 rbd_assert(which < img_request->obj_request_count);
1976
1977                 if (!obj_request_done_test(obj_request))
1978                         break;
1979                 more = rbd_img_obj_end_request(obj_request);
1980                 which++;
1981         }
1982
1983         rbd_assert(more ^ (which == img_request->obj_request_count));
1984         img_request->next_completion = which;
1985 out:
1986         spin_unlock_irq(&img_request->completion_lock);
1987
1988         if (!more)
1989                 rbd_img_request_complete(img_request);
1990 }
1991
1992 /*
1993  * Split up an image request into one or more object requests, each
1994  * to a different object.  The "type" parameter indicates whether
1995  * "data_desc" is the pointer to the head of a list of bio
1996  * structures, or the base of a page array.  In either case this
1997  * function assumes data_desc describes memory sufficient to hold
1998  * all data described by the image request.
1999  */
2000 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2001                                         enum obj_request_type type,
2002                                         void *data_desc)
2003 {
2004         struct rbd_device *rbd_dev = img_request->rbd_dev;
2005         struct rbd_obj_request *obj_request = NULL;
2006         struct rbd_obj_request *next_obj_request;
2007         bool write_request = img_request_write_test(img_request);
2008         struct bio *bio_list;
2009         unsigned int bio_offset = 0;
2010         struct page **pages;
2011         u64 img_offset;
2012         u64 resid;
2013         u16 opcode;
2014
2015         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2016                 (int)type, data_desc);
2017
2018         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2019         img_offset = img_request->offset;
2020         resid = img_request->length;
2021         rbd_assert(resid > 0);
2022
2023         if (type == OBJ_REQUEST_BIO) {
2024                 bio_list = data_desc;
2025                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2026         } else {
2027                 rbd_assert(type == OBJ_REQUEST_PAGES);
2028                 pages = data_desc;
2029         }
2030
2031         while (resid) {
2032                 struct ceph_osd_request *osd_req;
2033                 const char *object_name;
2034                 u64 offset;
2035                 u64 length;
2036
2037                 object_name = rbd_segment_name(rbd_dev, img_offset);
2038                 if (!object_name)
2039                         goto out_unwind;
2040                 offset = rbd_segment_offset(rbd_dev, img_offset);
2041                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2042                 obj_request = rbd_obj_request_create(object_name,
2043                                                 offset, length, type);
2044                 /* object request has its own copy of the object name */
2045                 rbd_segment_name_free(object_name);
2046                 if (!obj_request)
2047                         goto out_unwind;
2048
2049                 if (type == OBJ_REQUEST_BIO) {
2050                         unsigned int clone_size;
2051
2052                         rbd_assert(length <= (u64)UINT_MAX);
2053                         clone_size = (unsigned int)length;
2054                         obj_request->bio_list =
2055                                         bio_chain_clone_range(&bio_list,
2056                                                                 &bio_offset,
2057                                                                 clone_size,
2058                                                                 GFP_ATOMIC);
2059                         if (!obj_request->bio_list)
2060                                 goto out_partial;
2061                 } else {
2062                         unsigned int page_count;
2063
2064                         obj_request->pages = pages;
2065                         page_count = (u32)calc_pages_for(offset, length);
2066                         obj_request->page_count = page_count;
2067                         if ((offset + length) & ~PAGE_MASK)
2068                                 page_count--;   /* more on last page */
2069                         pages += page_count;
2070                 }
2071
2072                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2073                                                 obj_request);
2074                 if (!osd_req)
2075                         goto out_partial;
2076                 obj_request->osd_req = osd_req;
2077                 obj_request->callback = rbd_img_obj_callback;
2078
2079                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2080                                                 0, 0);
2081                 if (type == OBJ_REQUEST_BIO)
2082                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2083                                         obj_request->bio_list, length);
2084                 else
2085                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2086                                         obj_request->pages, length,
2087                                         offset & ~PAGE_MASK, false, false);
2088
2089                 if (write_request)
2090                         rbd_osd_req_format_write(obj_request);
2091                 else
2092                         rbd_osd_req_format_read(obj_request);
2093
2094                 obj_request->img_offset = img_offset;
2095                 rbd_img_obj_request_add(img_request, obj_request);
2096
2097                 img_offset += length;
2098                 resid -= length;
2099         }
2100
2101         return 0;
2102
2103 out_partial:
2104         rbd_obj_request_put(obj_request);
2105 out_unwind:
2106         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2107                 rbd_obj_request_put(obj_request);
2108
2109         return -ENOMEM;
2110 }
2111
2112 static void
2113 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2114 {
2115         struct rbd_img_request *img_request;
2116         struct rbd_device *rbd_dev;
2117         u64 length;
2118         u32 page_count;
2119
2120         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2121         rbd_assert(obj_request_img_data_test(obj_request));
2122         img_request = obj_request->img_request;
2123         rbd_assert(img_request);
2124
2125         rbd_dev = img_request->rbd_dev;
2126         rbd_assert(rbd_dev);
2127         length = (u64)1 << rbd_dev->header.obj_order;
2128         page_count = (u32)calc_pages_for(0, length);
2129
2130         rbd_assert(obj_request->copyup_pages);
2131         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2132         obj_request->copyup_pages = NULL;
2133
2134         /*
2135          * We want the transfer count to reflect the size of the
2136          * original write request.  There is no such thing as a
2137          * successful short write, so if the request was successful
2138          * we can just set it to the originally-requested length.
2139          */
2140         if (!obj_request->result)
2141                 obj_request->xferred = obj_request->length;
2142
2143         /* Finish up with the normal image object callback */
2144
2145         rbd_img_obj_callback(obj_request);
2146 }
2147
2148 static void
2149 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2150 {
2151         struct rbd_obj_request *orig_request;
2152         struct ceph_osd_request *osd_req;
2153         struct ceph_osd_client *osdc;
2154         struct rbd_device *rbd_dev;
2155         struct page **pages;
2156         int result;
2157         u64 obj_size;
2158         u64 xferred;
2159
2160         rbd_assert(img_request_child_test(img_request));
2161
2162         /* First get what we need from the image request */
2163
2164         pages = img_request->copyup_pages;
2165         rbd_assert(pages != NULL);
2166         img_request->copyup_pages = NULL;
2167
2168         orig_request = img_request->obj_request;
2169         rbd_assert(orig_request != NULL);
2170         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2171         result = img_request->result;
2172         obj_size = img_request->length;
2173         xferred = img_request->xferred;
2174
2175         rbd_dev = img_request->rbd_dev;
2176         rbd_assert(rbd_dev);
2177         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2178
2179         rbd_img_request_put(img_request);
2180
2181         if (result)
2182                 goto out_err;
2183
2184         /* Allocate the new copyup osd request for the original request */
2185
2186         result = -ENOMEM;
2187         rbd_assert(!orig_request->osd_req);
2188         osd_req = rbd_osd_req_create_copyup(orig_request);
2189         if (!osd_req)
2190                 goto out_err;
2191         orig_request->osd_req = osd_req;
2192         orig_request->copyup_pages = pages;
2193
2194         /* Initialize the copyup op */
2195
2196         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2197         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2198                                                 false, false);
2199
2200         /* Then the original write request op */
2201
2202         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2203                                         orig_request->offset,
2204                                         orig_request->length, 0, 0);
2205         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2206                                         orig_request->length);
2207
2208         rbd_osd_req_format_write(orig_request);
2209
2210         /* All set, send it off. */
2211
2212         orig_request->callback = rbd_img_obj_copyup_callback;
2213         osdc = &rbd_dev->rbd_client->client->osdc;
2214         result = rbd_obj_request_submit(osdc, orig_request);
2215         if (!result)
2216                 return;
2217 out_err:
2218         /* Record the error code and complete the request */
2219
2220         orig_request->result = result;
2221         orig_request->xferred = 0;
2222         obj_request_done_set(orig_request);
2223         rbd_obj_request_complete(orig_request);
2224 }
2225
2226 /*
2227  * Read from the parent image the range of data that covers the
2228  * entire target of the given object request.  This is used for
2229  * satisfying a layered image write request when the target of an
2230  * object request from the image request does not exist.
2231  *
2232  * A page array big enough to hold the returned data is allocated
2233  * and supplied to rbd_img_request_fill() as the "data descriptor."
2234  * When the read completes, this page array will be transferred to
2235  * the original object request for the copyup operation.
2236  *
2237  * If an error occurs, record it as the result of the original
2238  * object request and mark it done so it gets completed.
2239  */
2240 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2241 {
2242         struct rbd_img_request *img_request = NULL;
2243         struct rbd_img_request *parent_request = NULL;
2244         struct rbd_device *rbd_dev;
2245         u64 img_offset;
2246         u64 length;
2247         struct page **pages = NULL;
2248         u32 page_count;
2249         int result;
2250
2251         rbd_assert(obj_request_img_data_test(obj_request));
2252         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2253
2254         img_request = obj_request->img_request;
2255         rbd_assert(img_request != NULL);
2256         rbd_dev = img_request->rbd_dev;
2257         rbd_assert(rbd_dev->parent != NULL);
2258
2259         /*
2260          * First things first.  The original osd request is of no
2261          * use to use any more, we'll need a new one that can hold
2262          * the two ops in a copyup request.  We'll get that later,
2263          * but for now we can release the old one.
2264          */
2265         rbd_osd_req_destroy(obj_request->osd_req);
2266         obj_request->osd_req = NULL;
2267
2268         /*
2269          * Determine the byte range covered by the object in the
2270          * child image to which the original request was to be sent.
2271          */
2272         img_offset = obj_request->img_offset - obj_request->offset;
2273         length = (u64)1 << rbd_dev->header.obj_order;
2274
2275         /*
2276          * There is no defined parent data beyond the parent
2277          * overlap, so limit what we read at that boundary if
2278          * necessary.
2279          */
2280         if (img_offset + length > rbd_dev->parent_overlap) {
2281                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2282                 length = rbd_dev->parent_overlap - img_offset;
2283         }
2284
2285         /*
2286          * Allocate a page array big enough to receive the data read
2287          * from the parent.
2288          */
2289         page_count = (u32)calc_pages_for(0, length);
2290         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2291         if (IS_ERR(pages)) {
2292                 result = PTR_ERR(pages);
2293                 pages = NULL;
2294                 goto out_err;
2295         }
2296
2297         result = -ENOMEM;
2298         parent_request = rbd_img_request_create(rbd_dev->parent,
2299                                                 img_offset, length,
2300                                                 false, true);
2301         if (!parent_request)
2302                 goto out_err;
2303         rbd_obj_request_get(obj_request);
2304         parent_request->obj_request = obj_request;
2305
2306         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2307         if (result)
2308                 goto out_err;
2309         parent_request->copyup_pages = pages;
2310
2311         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2312         result = rbd_img_request_submit(parent_request);
2313         if (!result)
2314                 return 0;
2315
2316         parent_request->copyup_pages = NULL;
2317         parent_request->obj_request = NULL;
2318         rbd_obj_request_put(obj_request);
2319 out_err:
2320         if (pages)
2321                 ceph_release_page_vector(pages, page_count);
2322         if (parent_request)
2323                 rbd_img_request_put(parent_request);
2324         obj_request->result = result;
2325         obj_request->xferred = 0;
2326         obj_request_done_set(obj_request);
2327
2328         return result;
2329 }
2330
2331 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2332 {
2333         struct rbd_obj_request *orig_request;
2334         int result;
2335
2336         rbd_assert(!obj_request_img_data_test(obj_request));
2337
2338         /*
2339          * All we need from the object request is the original
2340          * request and the result of the STAT op.  Grab those, then
2341          * we're done with the request.
2342          */
2343         orig_request = obj_request->obj_request;
2344         obj_request->obj_request = NULL;
2345         rbd_assert(orig_request);
2346         rbd_assert(orig_request->img_request);
2347
2348         result = obj_request->result;
2349         obj_request->result = 0;
2350
2351         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2352                 obj_request, orig_request, result,
2353                 obj_request->xferred, obj_request->length);
2354         rbd_obj_request_put(obj_request);
2355
2356         rbd_assert(orig_request);
2357         rbd_assert(orig_request->img_request);
2358
2359         /*
2360          * Our only purpose here is to determine whether the object
2361          * exists, and we don't want to treat the non-existence as
2362          * an error.  If something else comes back, transfer the
2363          * error to the original request and complete it now.
2364          */
2365         if (!result) {
2366                 obj_request_existence_set(orig_request, true);
2367         } else if (result == -ENOENT) {
2368                 obj_request_existence_set(orig_request, false);
2369         } else if (result) {
2370                 orig_request->result = result;
2371                 goto out;
2372         }
2373
2374         /*
2375          * Resubmit the original request now that we have recorded
2376          * whether the target object exists.
2377          */
2378         orig_request->result = rbd_img_obj_request_submit(orig_request);
2379 out:
2380         if (orig_request->result)
2381                 rbd_obj_request_complete(orig_request);
2382         rbd_obj_request_put(orig_request);
2383 }
2384
2385 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2386 {
2387         struct rbd_obj_request *stat_request;
2388         struct rbd_device *rbd_dev;
2389         struct ceph_osd_client *osdc;
2390         struct page **pages = NULL;
2391         u32 page_count;
2392         size_t size;
2393         int ret;
2394
2395         /*
2396          * The response data for a STAT call consists of:
2397          *     le64 length;
2398          *     struct {
2399          *         le32 tv_sec;
2400          *         le32 tv_nsec;
2401          *     } mtime;
2402          */
2403         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2404         page_count = (u32)calc_pages_for(0, size);
2405         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2406         if (IS_ERR(pages))
2407                 return PTR_ERR(pages);
2408
2409         ret = -ENOMEM;
2410         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2411                                                         OBJ_REQUEST_PAGES);
2412         if (!stat_request)
2413                 goto out;
2414
2415         rbd_obj_request_get(obj_request);
2416         stat_request->obj_request = obj_request;
2417         stat_request->pages = pages;
2418         stat_request->page_count = page_count;
2419
2420         rbd_assert(obj_request->img_request);
2421         rbd_dev = obj_request->img_request->rbd_dev;
2422         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2423                                                 stat_request);
2424         if (!stat_request->osd_req)
2425                 goto out;
2426         stat_request->callback = rbd_img_obj_exists_callback;
2427
2428         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2429         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2430                                         false, false);
2431         rbd_osd_req_format_read(stat_request);
2432
2433         osdc = &rbd_dev->rbd_client->client->osdc;
2434         ret = rbd_obj_request_submit(osdc, stat_request);
2435 out:
2436         if (ret)
2437                 rbd_obj_request_put(obj_request);
2438
2439         return ret;
2440 }
2441
2442 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2443 {
2444         struct rbd_img_request *img_request;
2445         struct rbd_device *rbd_dev;
2446         bool known;
2447
2448         rbd_assert(obj_request_img_data_test(obj_request));
2449
2450         img_request = obj_request->img_request;
2451         rbd_assert(img_request);
2452         rbd_dev = img_request->rbd_dev;
2453
2454         /*
2455          * Only writes to layered images need special handling.
2456          * Reads and non-layered writes are simple object requests.
2457          * Layered writes that start beyond the end of the overlap
2458          * with the parent have no parent data, so they too are
2459          * simple object requests.  Finally, if the target object is
2460          * known to already exist, its parent data has already been
2461          * copied, so a write to the object can also be handled as a
2462          * simple object request.
2463          */
2464         if (!img_request_write_test(img_request) ||
2465                 !img_request_layered_test(img_request) ||
2466                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2467                 ((known = obj_request_known_test(obj_request)) &&
2468                         obj_request_exists_test(obj_request))) {
2469
2470                 struct rbd_device *rbd_dev;
2471                 struct ceph_osd_client *osdc;
2472
2473                 rbd_dev = obj_request->img_request->rbd_dev;
2474                 osdc = &rbd_dev->rbd_client->client->osdc;
2475
2476                 return rbd_obj_request_submit(osdc, obj_request);
2477         }
2478
2479         /*
2480          * It's a layered write.  The target object might exist but
2481          * we may not know that yet.  If we know it doesn't exist,
2482          * start by reading the data for the full target object from
2483          * the parent so we can use it for a copyup to the target.
2484          */
2485         if (known)
2486                 return rbd_img_obj_parent_read_full(obj_request);
2487
2488         /* We don't know whether the target exists.  Go find out. */
2489
2490         return rbd_img_obj_exists_submit(obj_request);
2491 }
2492
2493 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2494 {
2495         struct rbd_obj_request *obj_request;
2496         struct rbd_obj_request *next_obj_request;
2497
2498         dout("%s: img %p\n", __func__, img_request);
2499         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2500                 int ret;
2501
2502                 ret = rbd_img_obj_request_submit(obj_request);
2503                 if (ret)
2504                         return ret;
2505         }
2506
2507         return 0;
2508 }
2509
2510 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2511 {
2512         struct rbd_obj_request *obj_request;
2513         struct rbd_device *rbd_dev;
2514         u64 obj_end;
2515
2516         rbd_assert(img_request_child_test(img_request));
2517
2518         obj_request = img_request->obj_request;
2519         rbd_assert(obj_request);
2520         rbd_assert(obj_request->img_request);
2521
2522         obj_request->result = img_request->result;
2523         if (obj_request->result)
2524                 goto out;
2525
2526         /*
2527          * We need to zero anything beyond the parent overlap
2528          * boundary.  Since rbd_img_obj_request_read_callback()
2529          * will zero anything beyond the end of a short read, an
2530          * easy way to do this is to pretend the data from the
2531          * parent came up short--ending at the overlap boundary.
2532          */
2533         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2534         obj_end = obj_request->img_offset + obj_request->length;
2535         rbd_dev = obj_request->img_request->rbd_dev;
2536         if (obj_end > rbd_dev->parent_overlap) {
2537                 u64 xferred = 0;
2538
2539                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2540                         xferred = rbd_dev->parent_overlap -
2541                                         obj_request->img_offset;
2542
2543                 obj_request->xferred = min(img_request->xferred, xferred);
2544         } else {
2545                 obj_request->xferred = img_request->xferred;
2546         }
2547 out:
2548         rbd_img_request_put(img_request);
2549         rbd_img_obj_request_read_callback(obj_request);
2550         rbd_obj_request_complete(obj_request);
2551 }
2552
2553 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2554 {
2555         struct rbd_device *rbd_dev;
2556         struct rbd_img_request *img_request;
2557         int result;
2558
2559         rbd_assert(obj_request_img_data_test(obj_request));
2560         rbd_assert(obj_request->img_request != NULL);
2561         rbd_assert(obj_request->result == (s32) -ENOENT);
2562         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2563
2564         rbd_dev = obj_request->img_request->rbd_dev;
2565         rbd_assert(rbd_dev->parent != NULL);
2566         /* rbd_read_finish(obj_request, obj_request->length); */
2567         img_request = rbd_img_request_create(rbd_dev->parent,
2568                                                 obj_request->img_offset,
2569                                                 obj_request->length,
2570                                                 false, true);
2571         result = -ENOMEM;
2572         if (!img_request)
2573                 goto out_err;
2574
2575         rbd_obj_request_get(obj_request);
2576         img_request->obj_request = obj_request;
2577
2578         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2579                                         obj_request->bio_list);
2580         if (result)
2581                 goto out_err;
2582
2583         img_request->callback = rbd_img_parent_read_callback;
2584         result = rbd_img_request_submit(img_request);
2585         if (result)
2586                 goto out_err;
2587
2588         return;
2589 out_err:
2590         if (img_request)
2591                 rbd_img_request_put(img_request);
2592         obj_request->result = result;
2593         obj_request->xferred = 0;
2594         obj_request_done_set(obj_request);
2595 }
2596
2597 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2598 {
2599         struct rbd_obj_request *obj_request;
2600         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2601         int ret;
2602
2603         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2604                                                         OBJ_REQUEST_NODATA);
2605         if (!obj_request)
2606                 return -ENOMEM;
2607
2608         ret = -ENOMEM;
2609         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2610         if (!obj_request->osd_req)
2611                 goto out;
2612         obj_request->callback = rbd_obj_request_put;
2613
2614         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2615                                         notify_id, 0, 0);
2616         rbd_osd_req_format_read(obj_request);
2617
2618         ret = rbd_obj_request_submit(osdc, obj_request);
2619 out:
2620         if (ret)
2621                 rbd_obj_request_put(obj_request);
2622
2623         return ret;
2624 }
2625
2626 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2627 {
2628         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2629
2630         if (!rbd_dev)
2631                 return;
2632
2633         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2634                 rbd_dev->header_name, (unsigned long long)notify_id,
2635                 (unsigned int)opcode);
2636         (void)rbd_dev_refresh(rbd_dev);
2637
2638         rbd_obj_notify_ack(rbd_dev, notify_id);
2639 }
2640
2641 /*
2642  * Request sync osd watch/unwatch.  The value of "start" determines
2643  * whether a watch request is being initiated or torn down.
2644  */
2645 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2646 {
2647         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2648         struct rbd_obj_request *obj_request;
2649         int ret;
2650
2651         rbd_assert(start ^ !!rbd_dev->watch_event);
2652         rbd_assert(start ^ !!rbd_dev->watch_request);
2653
2654         if (start) {
2655                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2656                                                 &rbd_dev->watch_event);
2657                 if (ret < 0)
2658                         return ret;
2659                 rbd_assert(rbd_dev->watch_event != NULL);
2660         }
2661
2662         ret = -ENOMEM;
2663         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2664                                                         OBJ_REQUEST_NODATA);
2665         if (!obj_request)
2666                 goto out_cancel;
2667
2668         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2669         if (!obj_request->osd_req)
2670                 goto out_cancel;
2671
2672         if (start)
2673                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2674         else
2675                 ceph_osdc_unregister_linger_request(osdc,
2676                                         rbd_dev->watch_request->osd_req);
2677
2678         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2679                                 rbd_dev->watch_event->cookie, 0, start);
2680         rbd_osd_req_format_write(obj_request);
2681
2682         ret = rbd_obj_request_submit(osdc, obj_request);
2683         if (ret)
2684                 goto out_cancel;
2685         ret = rbd_obj_request_wait(obj_request);
2686         if (ret)
2687                 goto out_cancel;
2688         ret = obj_request->result;
2689         if (ret)
2690                 goto out_cancel;
2691
2692         /*
2693          * A watch request is set to linger, so the underlying osd
2694          * request won't go away until we unregister it.  We retain
2695          * a pointer to the object request during that time (in
2696          * rbd_dev->watch_request), so we'll keep a reference to
2697          * it.  We'll drop that reference (below) after we've
2698          * unregistered it.
2699          */
2700         if (start) {
2701                 rbd_dev->watch_request = obj_request;
2702
2703                 return 0;
2704         }
2705
2706         /* We have successfully torn down the watch request */
2707
2708         rbd_obj_request_put(rbd_dev->watch_request);
2709         rbd_dev->watch_request = NULL;
2710 out_cancel:
2711         /* Cancel the event if we're tearing down, or on error */
2712         ceph_osdc_cancel_event(rbd_dev->watch_event);
2713         rbd_dev->watch_event = NULL;
2714         if (obj_request)
2715                 rbd_obj_request_put(obj_request);
2716
2717         return ret;
2718 }
2719
2720 /*
2721  * Synchronous osd object method call.  Returns the number of bytes
2722  * returned in the outbound buffer, or a negative error code.
2723  */
2724 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2725                              const char *object_name,
2726                              const char *class_name,
2727                              const char *method_name,
2728                              const void *outbound,
2729                              size_t outbound_size,
2730                              void *inbound,
2731                              size_t inbound_size)
2732 {
2733         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2734         struct rbd_obj_request *obj_request;
2735         struct page **pages;
2736         u32 page_count;
2737         int ret;
2738
2739         /*
2740          * Method calls are ultimately read operations.  The result
2741          * should placed into the inbound buffer provided.  They
2742          * also supply outbound data--parameters for the object
2743          * method.  Currently if this is present it will be a
2744          * snapshot id.
2745          */
2746         page_count = (u32)calc_pages_for(0, inbound_size);
2747         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2748         if (IS_ERR(pages))
2749                 return PTR_ERR(pages);
2750
2751         ret = -ENOMEM;
2752         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2753                                                         OBJ_REQUEST_PAGES);
2754         if (!obj_request)
2755                 goto out;
2756
2757         obj_request->pages = pages;
2758         obj_request->page_count = page_count;
2759
2760         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2761         if (!obj_request->osd_req)
2762                 goto out;
2763
2764         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2765                                         class_name, method_name);
2766         if (outbound_size) {
2767                 struct ceph_pagelist *pagelist;
2768
2769                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2770                 if (!pagelist)
2771                         goto out;
2772
2773                 ceph_pagelist_init(pagelist);
2774                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2775                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2776                                                 pagelist);
2777         }
2778         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2779                                         obj_request->pages, inbound_size,
2780                                         0, false, false);
2781         rbd_osd_req_format_read(obj_request);
2782
2783         ret = rbd_obj_request_submit(osdc, obj_request);
2784         if (ret)
2785                 goto out;
2786         ret = rbd_obj_request_wait(obj_request);
2787         if (ret)
2788                 goto out;
2789
2790         ret = obj_request->result;
2791         if (ret < 0)
2792                 goto out;
2793
2794         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2795         ret = (int)obj_request->xferred;
2796         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2797 out:
2798         if (obj_request)
2799                 rbd_obj_request_put(obj_request);
2800         else
2801                 ceph_release_page_vector(pages, page_count);
2802
2803         return ret;
2804 }
2805
2806 static void rbd_request_fn(struct request_queue *q)
2807                 __releases(q->queue_lock) __acquires(q->queue_lock)
2808 {
2809         struct rbd_device *rbd_dev = q->queuedata;
2810         bool read_only = rbd_dev->mapping.read_only;
2811         struct request *rq;
2812         int result;
2813
2814         while ((rq = blk_fetch_request(q))) {
2815                 bool write_request = rq_data_dir(rq) == WRITE;
2816                 struct rbd_img_request *img_request;
2817                 u64 offset;
2818                 u64 length;
2819
2820                 /* Ignore any non-FS requests that filter through. */
2821
2822                 if (rq->cmd_type != REQ_TYPE_FS) {
2823                         dout("%s: non-fs request type %d\n", __func__,
2824                                 (int) rq->cmd_type);
2825                         __blk_end_request_all(rq, 0);
2826                         continue;
2827                 }
2828
2829                 /* Ignore/skip any zero-length requests */
2830
2831                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2832                 length = (u64) blk_rq_bytes(rq);
2833
2834                 if (!length) {
2835                         dout("%s: zero-length request\n", __func__);
2836                         __blk_end_request_all(rq, 0);
2837                         continue;
2838                 }
2839
2840                 spin_unlock_irq(q->queue_lock);
2841
2842                 /* Disallow writes to a read-only device */
2843
2844                 if (write_request) {
2845                         result = -EROFS;
2846                         if (read_only)
2847                                 goto end_request;
2848                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2849                 }
2850
2851                 /*
2852                  * Quit early if the mapped snapshot no longer
2853                  * exists.  It's still possible the snapshot will
2854                  * have disappeared by the time our request arrives
2855                  * at the osd, but there's no sense in sending it if
2856                  * we already know.
2857                  */
2858                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2859                         dout("request for non-existent snapshot");
2860                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2861                         result = -ENXIO;
2862                         goto end_request;
2863                 }
2864
2865                 result = -EINVAL;
2866                 if (offset && length > U64_MAX - offset + 1) {
2867                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2868                                 offset, length);
2869                         goto end_request;       /* Shouldn't happen */
2870                 }
2871
2872                 result = -ENOMEM;
2873                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2874                                                         write_request, false);
2875                 if (!img_request)
2876                         goto end_request;
2877
2878                 img_request->rq = rq;
2879
2880                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2881                                                 rq->bio);
2882                 if (!result)
2883                         result = rbd_img_request_submit(img_request);
2884                 if (result)
2885                         rbd_img_request_put(img_request);
2886 end_request:
2887                 spin_lock_irq(q->queue_lock);
2888                 if (result < 0) {
2889                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2890                                 write_request ? "write" : "read",
2891                                 length, offset, result);
2892
2893                         __blk_end_request_all(rq, result);
2894                 }
2895         }
2896 }
2897
2898 /*
2899  * a queue callback. Makes sure that we don't create a bio that spans across
2900  * multiple osd objects. One exception would be with a single page bios,
2901  * which we handle later at bio_chain_clone_range()
2902  */
2903 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2904                           struct bio_vec *bvec)
2905 {
2906         struct rbd_device *rbd_dev = q->queuedata;
2907         sector_t sector_offset;
2908         sector_t sectors_per_obj;
2909         sector_t obj_sector_offset;
2910         int ret;
2911
2912         /*
2913          * Find how far into its rbd object the partition-relative
2914          * bio start sector is to offset relative to the enclosing
2915          * device.
2916          */
2917         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2918         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2919         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2920
2921         /*
2922          * Compute the number of bytes from that offset to the end
2923          * of the object.  Account for what's already used by the bio.
2924          */
2925         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2926         if (ret > bmd->bi_size)
2927                 ret -= bmd->bi_size;
2928         else
2929                 ret = 0;
2930
2931         /*
2932          * Don't send back more than was asked for.  And if the bio
2933          * was empty, let the whole thing through because:  "Note
2934          * that a block device *must* allow a single page to be
2935          * added to an empty bio."
2936          */
2937         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2938         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2939                 ret = (int) bvec->bv_len;
2940
2941         return ret;
2942 }
2943
2944 static void rbd_free_disk(struct rbd_device *rbd_dev)
2945 {
2946         struct gendisk *disk = rbd_dev->disk;
2947
2948         if (!disk)
2949                 return;
2950
2951         rbd_dev->disk = NULL;
2952         if (disk->flags & GENHD_FL_UP) {
2953                 del_gendisk(disk);
2954                 if (disk->queue)
2955                         blk_cleanup_queue(disk->queue);
2956         }
2957         put_disk(disk);
2958 }
2959
2960 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2961                                 const char *object_name,
2962                                 u64 offset, u64 length, void *buf)
2963
2964 {
2965         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2966         struct rbd_obj_request *obj_request;
2967         struct page **pages = NULL;
2968         u32 page_count;
2969         size_t size;
2970         int ret;
2971
2972         page_count = (u32) calc_pages_for(offset, length);
2973         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2974         if (IS_ERR(pages))
2975                 ret = PTR_ERR(pages);
2976
2977         ret = -ENOMEM;
2978         obj_request = rbd_obj_request_create(object_name, offset, length,
2979                                                         OBJ_REQUEST_PAGES);
2980         if (!obj_request)
2981                 goto out;
2982
2983         obj_request->pages = pages;
2984         obj_request->page_count = page_count;
2985
2986         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2987         if (!obj_request->osd_req)
2988                 goto out;
2989
2990         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2991                                         offset, length, 0, 0);
2992         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2993                                         obj_request->pages,
2994                                         obj_request->length,
2995                                         obj_request->offset & ~PAGE_MASK,
2996                                         false, false);
2997         rbd_osd_req_format_read(obj_request);
2998
2999         ret = rbd_obj_request_submit(osdc, obj_request);
3000         if (ret)
3001                 goto out;
3002         ret = rbd_obj_request_wait(obj_request);
3003         if (ret)
3004                 goto out;
3005
3006         ret = obj_request->result;
3007         if (ret < 0)
3008                 goto out;
3009
3010         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3011         size = (size_t) obj_request->xferred;
3012         ceph_copy_from_page_vector(pages, buf, 0, size);
3013         rbd_assert(size <= (size_t)INT_MAX);
3014         ret = (int)size;
3015 out:
3016         if (obj_request)
3017                 rbd_obj_request_put(obj_request);
3018         else
3019                 ceph_release_page_vector(pages, page_count);
3020
3021         return ret;
3022 }
3023
3024 /*
3025  * Read the complete header for the given rbd device.
3026  *
3027  * Returns a pointer to a dynamically-allocated buffer containing
3028  * the complete and validated header.  Caller can pass the address
3029  * of a variable that will be filled in with the version of the
3030  * header object at the time it was read.
3031  *
3032  * Returns a pointer-coded errno if a failure occurs.
3033  */
3034 static struct rbd_image_header_ondisk *
3035 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3036 {
3037         struct rbd_image_header_ondisk *ondisk = NULL;
3038         u32 snap_count = 0;
3039         u64 names_size = 0;
3040         u32 want_count;
3041         int ret;
3042
3043         /*
3044          * The complete header will include an array of its 64-bit
3045          * snapshot ids, followed by the names of those snapshots as
3046          * a contiguous block of NUL-terminated strings.  Note that
3047          * the number of snapshots could change by the time we read
3048          * it in, in which case we re-read it.
3049          */
3050         do {
3051                 size_t size;
3052
3053                 kfree(ondisk);
3054
3055                 size = sizeof (*ondisk);
3056                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3057                 size += names_size;
3058                 ondisk = kmalloc(size, GFP_KERNEL);
3059                 if (!ondisk)
3060                         return ERR_PTR(-ENOMEM);
3061
3062                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3063                                        0, size, ondisk);
3064                 if (ret < 0)
3065                         goto out_err;
3066                 if ((size_t)ret < size) {
3067                         ret = -ENXIO;
3068                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3069                                 size, ret);
3070                         goto out_err;
3071                 }
3072                 if (!rbd_dev_ondisk_valid(ondisk)) {
3073                         ret = -ENXIO;
3074                         rbd_warn(rbd_dev, "invalid header");
3075                         goto out_err;
3076                 }
3077
3078                 names_size = le64_to_cpu(ondisk->snap_names_len);
3079                 want_count = snap_count;
3080                 snap_count = le32_to_cpu(ondisk->snap_count);
3081         } while (snap_count != want_count);
3082
3083         return ondisk;
3084
3085 out_err:
3086         kfree(ondisk);
3087
3088         return ERR_PTR(ret);
3089 }
3090
3091 /*
3092  * reload the ondisk the header
3093  */
3094 static int rbd_read_header(struct rbd_device *rbd_dev,
3095                            struct rbd_image_header *header)
3096 {
3097         struct rbd_image_header_ondisk *ondisk;
3098         int ret;
3099
3100         ondisk = rbd_dev_v1_header_read(rbd_dev);
3101         if (IS_ERR(ondisk))
3102                 return PTR_ERR(ondisk);
3103         ret = rbd_header_from_disk(header, ondisk);
3104         kfree(ondisk);
3105
3106         return ret;
3107 }
3108
3109 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3110 {
3111         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3112                 return;
3113
3114         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3115                 sector_t size;
3116
3117                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3118                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3119                 dout("setting size to %llu sectors", (unsigned long long)size);
3120                 set_capacity(rbd_dev->disk, size);
3121         }
3122 }
3123
3124 /*
3125  * only read the first part of the ondisk header, without the snaps info
3126  */
3127 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3128 {
3129         int ret;
3130         struct rbd_image_header h;
3131
3132         ret = rbd_read_header(rbd_dev, &h);
3133         if (ret < 0)
3134                 return ret;
3135
3136         down_write(&rbd_dev->header_rwsem);
3137
3138         /* Update image size, and check for resize of mapped image */
3139         rbd_dev->header.image_size = h.image_size;
3140         rbd_update_mapping_size(rbd_dev);
3141
3142         /* rbd_dev->header.object_prefix shouldn't change */
3143         kfree(rbd_dev->header.snap_sizes);
3144         kfree(rbd_dev->header.snap_names);
3145         /* osd requests may still refer to snapc */
3146         ceph_put_snap_context(rbd_dev->header.snapc);
3147
3148         rbd_dev->header.image_size = h.image_size;
3149         rbd_dev->header.snapc = h.snapc;
3150         rbd_dev->header.snap_names = h.snap_names;
3151         rbd_dev->header.snap_sizes = h.snap_sizes;
3152         /* Free the extra copy of the object prefix */
3153         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3154                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3155         kfree(h.object_prefix);
3156
3157         up_write(&rbd_dev->header_rwsem);
3158
3159         return ret;
3160 }
3161
3162 /*
3163  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3164  * has disappeared from the (just updated) snapshot context.
3165  */
3166 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3167 {
3168         u64 snap_id;
3169
3170         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3171                 return;
3172
3173         snap_id = rbd_dev->spec->snap_id;
3174         if (snap_id == CEPH_NOSNAP)
3175                 return;
3176
3177         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3178                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3179 }
3180
3181 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3182 {
3183         u64 image_size;
3184         int ret;
3185
3186         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3187         image_size = rbd_dev->header.image_size;
3188         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3189         if (rbd_dev->image_format == 1)
3190                 ret = rbd_dev_v1_refresh(rbd_dev);
3191         else
3192                 ret = rbd_dev_v2_refresh(rbd_dev);
3193
3194         /* If it's a mapped snapshot, validate its EXISTS flag */
3195
3196         rbd_exists_validate(rbd_dev);
3197         mutex_unlock(&ctl_mutex);
3198         if (ret)
3199                 rbd_warn(rbd_dev, "got notification but failed to "
3200                            " update snaps: %d\n", ret);
3201         if (image_size != rbd_dev->header.image_size)
3202                 revalidate_disk(rbd_dev->disk);
3203
3204         return ret;
3205 }
3206
3207 static int rbd_init_disk(struct rbd_device *rbd_dev)
3208 {
3209         struct gendisk *disk;
3210         struct request_queue *q;
3211         u64 segment_size;
3212
3213         /* create gendisk info */
3214         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3215         if (!disk)
3216                 return -ENOMEM;
3217
3218         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3219                  rbd_dev->dev_id);
3220         disk->major = rbd_dev->major;
3221         disk->first_minor = 0;
3222         disk->fops = &rbd_bd_ops;
3223         disk->private_data = rbd_dev;
3224
3225         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3226         if (!q)
3227                 goto out_disk;
3228
3229         /* We use the default size, but let's be explicit about it. */
3230         blk_queue_physical_block_size(q, SECTOR_SIZE);
3231
3232         /* set io sizes to object size */
3233         segment_size = rbd_obj_bytes(&rbd_dev->header);
3234         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3235         blk_queue_max_segment_size(q, segment_size);
3236         blk_queue_io_min(q, segment_size);
3237         blk_queue_io_opt(q, segment_size);
3238
3239         blk_queue_merge_bvec(q, rbd_merge_bvec);
3240         disk->queue = q;
3241
3242         q->queuedata = rbd_dev;
3243
3244         rbd_dev->disk = disk;
3245
3246         return 0;
3247 out_disk:
3248         put_disk(disk);
3249
3250         return -ENOMEM;
3251 }
3252
3253 /*
3254   sysfs
3255 */
3256
3257 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3258 {
3259         return container_of(dev, struct rbd_device, dev);
3260 }
3261
3262 static ssize_t rbd_size_show(struct device *dev,
3263                              struct device_attribute *attr, char *buf)
3264 {
3265         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3266
3267         return sprintf(buf, "%llu\n",
3268                 (unsigned long long)rbd_dev->mapping.size);
3269 }
3270
3271 /*
3272  * Note this shows the features for whatever's mapped, which is not
3273  * necessarily the base image.
3274  */
3275 static ssize_t rbd_features_show(struct device *dev,
3276                              struct device_attribute *attr, char *buf)
3277 {
3278         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3279
3280         return sprintf(buf, "0x%016llx\n",
3281                         (unsigned long long)rbd_dev->mapping.features);
3282 }
3283
3284 static ssize_t rbd_major_show(struct device *dev,
3285                               struct device_attribute *attr, char *buf)
3286 {
3287         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3288
3289         if (rbd_dev->major)
3290                 return sprintf(buf, "%d\n", rbd_dev->major);
3291
3292         return sprintf(buf, "(none)\n");
3293
3294 }
3295
3296 static ssize_t rbd_client_id_show(struct device *dev,
3297                                   struct device_attribute *attr, char *buf)
3298 {
3299         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3300
3301         return sprintf(buf, "client%lld\n",
3302                         ceph_client_id(rbd_dev->rbd_client->client));
3303 }
3304
3305 static ssize_t rbd_pool_show(struct device *dev,
3306                              struct device_attribute *attr, char *buf)
3307 {
3308         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3309
3310         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3311 }
3312
3313 static ssize_t rbd_pool_id_show(struct device *dev,
3314                              struct device_attribute *attr, char *buf)
3315 {
3316         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3317
3318         return sprintf(buf, "%llu\n",
3319                         (unsigned long long) rbd_dev->spec->pool_id);
3320 }
3321
3322 static ssize_t rbd_name_show(struct device *dev,
3323                              struct device_attribute *attr, char *buf)
3324 {
3325         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3326
3327         if (rbd_dev->spec->image_name)
3328                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3329
3330         return sprintf(buf, "(unknown)\n");
3331 }
3332
3333 static ssize_t rbd_image_id_show(struct device *dev,
3334                              struct device_attribute *attr, char *buf)
3335 {
3336         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3337
3338         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3339 }
3340
3341 /*
3342  * Shows the name of the currently-mapped snapshot (or
3343  * RBD_SNAP_HEAD_NAME for the base image).
3344  */
3345 static ssize_t rbd_snap_show(struct device *dev,
3346                              struct device_attribute *attr,
3347                              char *buf)
3348 {
3349         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3350
3351         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3352 }
3353
3354 /*
3355  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3356  * for the parent image.  If there is no parent, simply shows
3357  * "(no parent image)".
3358  */
3359 static ssize_t rbd_parent_show(struct device *dev,
3360                              struct device_attribute *attr,
3361                              char *buf)
3362 {
3363         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3364         struct rbd_spec *spec = rbd_dev->parent_spec;
3365         int count;
3366         char *bufp = buf;
3367
3368         if (!spec)
3369                 return sprintf(buf, "(no parent image)\n");
3370
3371         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3372                         (unsigned long long) spec->pool_id, spec->pool_name);
3373         if (count < 0)
3374                 return count;
3375         bufp += count;
3376
3377         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3378                         spec->image_name ? spec->image_name : "(unknown)");
3379         if (count < 0)
3380                 return count;
3381         bufp += count;
3382
3383         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3384                         (unsigned long long) spec->snap_id, spec->snap_name);
3385         if (count < 0)
3386                 return count;
3387         bufp += count;
3388
3389         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3390         if (count < 0)
3391                 return count;
3392         bufp += count;
3393
3394         return (ssize_t) (bufp - buf);
3395 }
3396
3397 static ssize_t rbd_image_refresh(struct device *dev,
3398                                  struct device_attribute *attr,
3399                                  const char *buf,
3400                                  size_t size)
3401 {
3402         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3403         int ret;
3404
3405         ret = rbd_dev_refresh(rbd_dev);
3406
3407         return ret < 0 ? ret : size;
3408 }
3409
3410 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3411 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3412 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3413 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3414 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3415 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3416 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3417 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3418 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3419 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3420 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3421
3422 static struct attribute *rbd_attrs[] = {
3423         &dev_attr_size.attr,
3424         &dev_attr_features.attr,
3425         &dev_attr_major.attr,
3426         &dev_attr_client_id.attr,
3427         &dev_attr_pool.attr,
3428         &dev_attr_pool_id.attr,
3429         &dev_attr_name.attr,
3430         &dev_attr_image_id.attr,
3431         &dev_attr_current_snap.attr,
3432         &dev_attr_parent.attr,
3433         &dev_attr_refresh.attr,
3434         NULL
3435 };
3436
3437 static struct attribute_group rbd_attr_group = {
3438         .attrs = rbd_attrs,
3439 };
3440
3441 static const struct attribute_group *rbd_attr_groups[] = {
3442         &rbd_attr_group,
3443         NULL
3444 };
3445
3446 static void rbd_sysfs_dev_release(struct device *dev)
3447 {
3448 }
3449
3450 static struct device_type rbd_device_type = {
3451         .name           = "rbd",
3452         .groups         = rbd_attr_groups,
3453         .release        = rbd_sysfs_dev_release,
3454 };
3455
3456 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3457 {
3458         kref_get(&spec->kref);
3459
3460         return spec;
3461 }
3462
3463 static void rbd_spec_free(struct kref *kref);
3464 static void rbd_spec_put(struct rbd_spec *spec)
3465 {
3466         if (spec)
3467                 kref_put(&spec->kref, rbd_spec_free);
3468 }
3469
3470 static struct rbd_spec *rbd_spec_alloc(void)
3471 {
3472         struct rbd_spec *spec;
3473
3474         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3475         if (!spec)
3476                 return NULL;
3477         kref_init(&spec->kref);
3478
3479         return spec;
3480 }
3481
3482 static void rbd_spec_free(struct kref *kref)
3483 {
3484         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3485
3486         kfree(spec->pool_name);
3487         kfree(spec->image_id);
3488         kfree(spec->image_name);
3489         kfree(spec->snap_name);
3490         kfree(spec);
3491 }
3492
3493 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3494                                 struct rbd_spec *spec)
3495 {
3496         struct rbd_device *rbd_dev;
3497
3498         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3499         if (!rbd_dev)
3500                 return NULL;
3501
3502         spin_lock_init(&rbd_dev->lock);
3503         rbd_dev->flags = 0;
3504         INIT_LIST_HEAD(&rbd_dev->node);
3505         init_rwsem(&rbd_dev->header_rwsem);
3506
3507         rbd_dev->spec = spec;
3508         rbd_dev->rbd_client = rbdc;
3509
3510         /* Initialize the layout used for all rbd requests */
3511
3512         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3513         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3514         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3515         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3516
3517         return rbd_dev;
3518 }
3519
3520 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3521 {
3522         rbd_put_client(rbd_dev->rbd_client);
3523         rbd_spec_put(rbd_dev->spec);
3524         kfree(rbd_dev);
3525 }
3526
3527 /*
3528  * Get the size and object order for an image snapshot, or if
3529  * snap_id is CEPH_NOSNAP, gets this information for the base
3530  * image.
3531  */
3532 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3533                                 u8 *order, u64 *snap_size)
3534 {
3535         __le64 snapid = cpu_to_le64(snap_id);
3536         int ret;
3537         struct {
3538                 u8 order;
3539                 __le64 size;
3540         } __attribute__ ((packed)) size_buf = { 0 };
3541
3542         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3543                                 "rbd", "get_size",
3544                                 &snapid, sizeof (snapid),
3545                                 &size_buf, sizeof (size_buf));
3546         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3547         if (ret < 0)
3548                 return ret;
3549         if (ret < sizeof (size_buf))
3550                 return -ERANGE;
3551
3552         if (order)
3553                 *order = size_buf.order;
3554         *snap_size = le64_to_cpu(size_buf.size);
3555
3556         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3557                 (unsigned long long)snap_id, (unsigned int)*order,
3558                 (unsigned long long)*snap_size);
3559
3560         return 0;
3561 }
3562
3563 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3564 {
3565         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3566                                         &rbd_dev->header.obj_order,
3567                                         &rbd_dev->header.image_size);
3568 }
3569
3570 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3571 {
3572         void *reply_buf;
3573         int ret;
3574         void *p;
3575
3576         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3577         if (!reply_buf)
3578                 return -ENOMEM;
3579
3580         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3581                                 "rbd", "get_object_prefix", NULL, 0,
3582                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3583         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3584         if (ret < 0)
3585                 goto out;
3586
3587         p = reply_buf;
3588         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3589                                                 p + ret, NULL, GFP_NOIO);
3590         ret = 0;
3591
3592         if (IS_ERR(rbd_dev->header.object_prefix)) {
3593                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3594                 rbd_dev->header.object_prefix = NULL;
3595         } else {
3596                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3597         }
3598 out:
3599         kfree(reply_buf);
3600
3601         return ret;
3602 }
3603
3604 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3605                 u64 *snap_features)
3606 {
3607         __le64 snapid = cpu_to_le64(snap_id);
3608         struct {
3609                 __le64 features;
3610                 __le64 incompat;
3611         } __attribute__ ((packed)) features_buf = { 0 };
3612         u64 incompat;
3613         int ret;
3614
3615         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3616                                 "rbd", "get_features",
3617                                 &snapid, sizeof (snapid),
3618                                 &features_buf, sizeof (features_buf));
3619         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3620         if (ret < 0)
3621                 return ret;
3622         if (ret < sizeof (features_buf))
3623                 return -ERANGE;
3624
3625         incompat = le64_to_cpu(features_buf.incompat);
3626         if (incompat & ~RBD_FEATURES_SUPPORTED)
3627                 return -ENXIO;
3628
3629         *snap_features = le64_to_cpu(features_buf.features);
3630
3631         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3632                 (unsigned long long)snap_id,
3633                 (unsigned long long)*snap_features,
3634                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3635
3636         return 0;
3637 }
3638
3639 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3640 {
3641         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3642                                                 &rbd_dev->header.features);
3643 }
3644
3645 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3646 {
3647         struct rbd_spec *parent_spec;
3648         size_t size;
3649         void *reply_buf = NULL;
3650         __le64 snapid;
3651         void *p;
3652         void *end;
3653         char *image_id;
3654         u64 overlap;
3655         int ret;
3656
3657         parent_spec = rbd_spec_alloc();
3658         if (!parent_spec)
3659                 return -ENOMEM;
3660
3661         size = sizeof (__le64) +                                /* pool_id */
3662                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3663                 sizeof (__le64) +                               /* snap_id */
3664                 sizeof (__le64);                                /* overlap */
3665         reply_buf = kmalloc(size, GFP_KERNEL);
3666         if (!reply_buf) {
3667                 ret = -ENOMEM;
3668                 goto out_err;
3669         }
3670
3671         snapid = cpu_to_le64(CEPH_NOSNAP);
3672         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3673                                 "rbd", "get_parent",
3674                                 &snapid, sizeof (snapid),
3675                                 reply_buf, size);
3676         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3677         if (ret < 0)
3678                 goto out_err;
3679
3680         p = reply_buf;
3681         end = reply_buf + ret;
3682         ret = -ERANGE;
3683         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3684         if (parent_spec->pool_id == CEPH_NOPOOL)
3685                 goto out;       /* No parent?  No problem. */
3686
3687         /* The ceph file layout needs to fit pool id in 32 bits */
3688
3689         ret = -EIO;
3690         if (parent_spec->pool_id > (u64)U32_MAX) {
3691                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3692                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3693                 goto out_err;
3694         }
3695
3696         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3697         if (IS_ERR(image_id)) {
3698                 ret = PTR_ERR(image_id);
3699                 goto out_err;
3700         }
3701         parent_spec->image_id = image_id;
3702         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3703         ceph_decode_64_safe(&p, end, overlap, out_err);
3704
3705         rbd_dev->parent_overlap = overlap;
3706         rbd_dev->parent_spec = parent_spec;
3707         parent_spec = NULL;     /* rbd_dev now owns this */
3708 out:
3709         ret = 0;
3710 out_err:
3711         kfree(reply_buf);
3712         rbd_spec_put(parent_spec);
3713
3714         return ret;
3715 }
3716
3717 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3718 {
3719         struct {
3720                 __le64 stripe_unit;
3721                 __le64 stripe_count;
3722         } __attribute__ ((packed)) striping_info_buf = { 0 };
3723         size_t size = sizeof (striping_info_buf);
3724         void *p;
3725         u64 obj_size;
3726         u64 stripe_unit;
3727         u64 stripe_count;
3728         int ret;
3729
3730         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3731                                 "rbd", "get_stripe_unit_count", NULL, 0,
3732                                 (char *)&striping_info_buf, size);
3733         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3734         if (ret < 0)
3735                 return ret;
3736         if (ret < size)
3737                 return -ERANGE;
3738
3739         /*
3740          * We don't actually support the "fancy striping" feature
3741          * (STRIPINGV2) yet, but if the striping sizes are the
3742          * defaults the behavior is the same as before.  So find
3743          * out, and only fail if the image has non-default values.
3744          */
3745         ret = -EINVAL;
3746         obj_size = (u64)1 << rbd_dev->header.obj_order;
3747         p = &striping_info_buf;
3748         stripe_unit = ceph_decode_64(&p);
3749         if (stripe_unit != obj_size) {
3750                 rbd_warn(rbd_dev, "unsupported stripe unit "
3751                                 "(got %llu want %llu)",
3752                                 stripe_unit, obj_size);
3753                 return -EINVAL;
3754         }
3755         stripe_count = ceph_decode_64(&p);
3756         if (stripe_count != 1) {
3757                 rbd_warn(rbd_dev, "unsupported stripe count "
3758                                 "(got %llu want 1)", stripe_count);
3759                 return -EINVAL;
3760         }
3761         rbd_dev->header.stripe_unit = stripe_unit;
3762         rbd_dev->header.stripe_count = stripe_count;
3763
3764         return 0;
3765 }
3766
3767 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3768 {
3769         size_t image_id_size;
3770         char *image_id;
3771         void *p;
3772         void *end;
3773         size_t size;
3774         void *reply_buf = NULL;
3775         size_t len = 0;
3776         char *image_name = NULL;
3777         int ret;
3778
3779         rbd_assert(!rbd_dev->spec->image_name);
3780
3781         len = strlen(rbd_dev->spec->image_id);
3782         image_id_size = sizeof (__le32) + len;
3783         image_id = kmalloc(image_id_size, GFP_KERNEL);
3784         if (!image_id)
3785                 return NULL;
3786
3787         p = image_id;
3788         end = image_id + image_id_size;
3789         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3790
3791         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3792         reply_buf = kmalloc(size, GFP_KERNEL);
3793         if (!reply_buf)
3794                 goto out;
3795
3796         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3797                                 "rbd", "dir_get_name",
3798                                 image_id, image_id_size,
3799                                 reply_buf, size);
3800         if (ret < 0)
3801                 goto out;
3802         p = reply_buf;
3803         end = reply_buf + ret;
3804
3805         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3806         if (IS_ERR(image_name))
3807                 image_name = NULL;
3808         else
3809                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3810 out:
3811         kfree(reply_buf);
3812         kfree(image_id);
3813
3814         return image_name;
3815 }
3816
3817 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3818 {
3819         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3820         const char *snap_name;
3821         u32 which = 0;
3822
3823         /* Skip over names until we find the one we are looking for */
3824
3825         snap_name = rbd_dev->header.snap_names;
3826         while (which < snapc->num_snaps) {
3827                 if (!strcmp(name, snap_name))
3828                         return snapc->snaps[which];
3829                 snap_name += strlen(snap_name) + 1;
3830                 which++;
3831         }
3832         return CEPH_NOSNAP;
3833 }
3834
3835 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3836 {
3837         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3838         u32 which;
3839         bool found = false;
3840         u64 snap_id;
3841
3842         for (which = 0; !found && which < snapc->num_snaps; which++) {
3843                 const char *snap_name;
3844
3845                 snap_id = snapc->snaps[which];
3846                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3847                 if (IS_ERR(snap_name))
3848                         break;
3849                 found = !strcmp(name, snap_name);
3850                 kfree(snap_name);
3851         }
3852         return found ? snap_id : CEPH_NOSNAP;
3853 }
3854
3855 /*
3856  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3857  * no snapshot by that name is found, or if an error occurs.
3858  */
3859 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3860 {
3861         if (rbd_dev->image_format == 1)
3862                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3863
3864         return rbd_v2_snap_id_by_name(rbd_dev, name);
3865 }
3866
3867 /*
3868  * When an rbd image has a parent image, it is identified by the
3869  * pool, image, and snapshot ids (not names).  This function fills
3870  * in the names for those ids.  (It's OK if we can't figure out the
3871  * name for an image id, but the pool and snapshot ids should always
3872  * exist and have names.)  All names in an rbd spec are dynamically
3873  * allocated.
3874  *
3875  * When an image being mapped (not a parent) is probed, we have the
3876  * pool name and pool id, image name and image id, and the snapshot
3877  * name.  The only thing we're missing is the snapshot id.
3878  */
3879 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3880 {
3881         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3882         struct rbd_spec *spec = rbd_dev->spec;
3883         const char *pool_name;
3884         const char *image_name;
3885         const char *snap_name;
3886         int ret;
3887
3888         /*
3889          * An image being mapped will have the pool name (etc.), but
3890          * we need to look up the snapshot id.
3891          */
3892         if (spec->pool_name) {
3893                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3894                         u64 snap_id;
3895
3896                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3897                         if (snap_id == CEPH_NOSNAP)
3898                                 return -ENOENT;
3899                         spec->snap_id = snap_id;
3900                 } else {
3901                         spec->snap_id = CEPH_NOSNAP;
3902                 }
3903
3904                 return 0;
3905         }
3906
3907         /* Get the pool name; we have to make our own copy of this */
3908
3909         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3910         if (!pool_name) {
3911                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3912                 return -EIO;
3913         }
3914         pool_name = kstrdup(pool_name, GFP_KERNEL);
3915         if (!pool_name)
3916                 return -ENOMEM;
3917
3918         /* Fetch the image name; tolerate failure here */
3919
3920         image_name = rbd_dev_image_name(rbd_dev);
3921         if (!image_name)
3922                 rbd_warn(rbd_dev, "unable to get image name");
3923
3924         /* Look up the snapshot name, and make a copy */
3925
3926         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3927         if (!snap_name) {
3928                 ret = -ENOMEM;
3929                 goto out_err;
3930         }
3931
3932         spec->pool_name = pool_name;
3933         spec->image_name = image_name;
3934         spec->snap_name = snap_name;
3935
3936         return 0;
3937 out_err:
3938         kfree(image_name);
3939         kfree(pool_name);
3940
3941         return ret;
3942 }
3943
3944 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3945 {
3946         size_t size;
3947         int ret;
3948         void *reply_buf;
3949         void *p;
3950         void *end;
3951         u64 seq;
3952         u32 snap_count;
3953         struct ceph_snap_context *snapc;
3954         u32 i;
3955
3956         /*
3957          * We'll need room for the seq value (maximum snapshot id),
3958          * snapshot count, and array of that many snapshot ids.
3959          * For now we have a fixed upper limit on the number we're
3960          * prepared to receive.
3961          */
3962         size = sizeof (__le64) + sizeof (__le32) +
3963                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3964         reply_buf = kzalloc(size, GFP_KERNEL);
3965         if (!reply_buf)
3966                 return -ENOMEM;
3967
3968         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3969                                 "rbd", "get_snapcontext", NULL, 0,
3970                                 reply_buf, size);
3971         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3972         if (ret < 0)
3973                 goto out;
3974
3975         p = reply_buf;
3976         end = reply_buf + ret;
3977         ret = -ERANGE;
3978         ceph_decode_64_safe(&p, end, seq, out);
3979         ceph_decode_32_safe(&p, end, snap_count, out);
3980
3981         /*
3982          * Make sure the reported number of snapshot ids wouldn't go
3983          * beyond the end of our buffer.  But before checking that,
3984          * make sure the computed size of the snapshot context we
3985          * allocate is representable in a size_t.
3986          */
3987         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3988                                  / sizeof (u64)) {
3989                 ret = -EINVAL;
3990                 goto out;
3991         }
3992         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3993                 goto out;
3994         ret = 0;
3995
3996         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3997         if (!snapc) {
3998                 ret = -ENOMEM;
3999                 goto out;
4000         }
4001         snapc->seq = seq;
4002         for (i = 0; i < snap_count; i++)
4003                 snapc->snaps[i] = ceph_decode_64(&p);
4004
4005         rbd_dev->header.snapc = snapc;
4006
4007         dout("  snap context seq = %llu, snap_count = %u\n",
4008                 (unsigned long long)seq, (unsigned int)snap_count);
4009 out:
4010         kfree(reply_buf);
4011
4012         return ret;
4013 }
4014
4015 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4016                                         u64 snap_id)
4017 {
4018         size_t size;
4019         void *reply_buf;
4020         __le64 snapid;
4021         int ret;
4022         void *p;
4023         void *end;
4024         char *snap_name;
4025
4026         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4027         reply_buf = kmalloc(size, GFP_KERNEL);
4028         if (!reply_buf)
4029                 return ERR_PTR(-ENOMEM);
4030
4031         snapid = cpu_to_le64(snap_id);
4032         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4033                                 "rbd", "get_snapshot_name",
4034                                 &snapid, sizeof (snapid),
4035                                 reply_buf, size);
4036         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4037         if (ret < 0) {
4038                 snap_name = ERR_PTR(ret);
4039                 goto out;
4040         }
4041
4042         p = reply_buf;
4043         end = reply_buf + ret;
4044         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4045         if (IS_ERR(snap_name))
4046                 goto out;
4047
4048         dout("  snap_id 0x%016llx snap_name = %s\n",
4049                 (unsigned long long)snap_id, snap_name);
4050 out:
4051         kfree(reply_buf);
4052
4053         return snap_name;
4054 }
4055
4056 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4057 {
4058         int ret;
4059
4060         down_write(&rbd_dev->header_rwsem);
4061
4062         ret = rbd_dev_v2_image_size(rbd_dev);
4063         if (ret)
4064                 goto out;
4065         rbd_update_mapping_size(rbd_dev);
4066
4067         ret = rbd_dev_v2_snap_context(rbd_dev);
4068         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4069         if (ret)
4070                 goto out;
4071 out:
4072         up_write(&rbd_dev->header_rwsem);
4073
4074         return ret;
4075 }
4076
4077 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4078 {
4079         struct device *dev;
4080         int ret;
4081
4082         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4083
4084         dev = &rbd_dev->dev;
4085         dev->bus = &rbd_bus_type;
4086         dev->type = &rbd_device_type;
4087         dev->parent = &rbd_root_dev;
4088         dev->release = rbd_dev_device_release;
4089         dev_set_name(dev, "%d", rbd_dev->dev_id);
4090         ret = device_register(dev);
4091
4092         mutex_unlock(&ctl_mutex);
4093
4094         return ret;
4095 }
4096
4097 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4098 {
4099         device_unregister(&rbd_dev->dev);
4100 }
4101
4102 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4103
4104 /*
4105  * Get a unique rbd identifier for the given new rbd_dev, and add
4106  * the rbd_dev to the global list.  The minimum rbd id is 1.
4107  */
4108 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4109 {
4110         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4111
4112         spin_lock(&rbd_dev_list_lock);
4113         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4114         spin_unlock(&rbd_dev_list_lock);
4115         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4116                 (unsigned long long) rbd_dev->dev_id);
4117 }
4118
4119 /*
4120  * Remove an rbd_dev from the global list, and record that its
4121  * identifier is no longer in use.
4122  */
4123 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4124 {
4125         struct list_head *tmp;
4126         int rbd_id = rbd_dev->dev_id;
4127         int max_id;
4128
4129         rbd_assert(rbd_id > 0);
4130
4131         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4132                 (unsigned long long) rbd_dev->dev_id);
4133         spin_lock(&rbd_dev_list_lock);
4134         list_del_init(&rbd_dev->node);
4135
4136         /*
4137          * If the id being "put" is not the current maximum, there
4138          * is nothing special we need to do.
4139          */
4140         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4141                 spin_unlock(&rbd_dev_list_lock);
4142                 return;
4143         }
4144
4145         /*
4146          * We need to update the current maximum id.  Search the
4147          * list to find out what it is.  We're more likely to find
4148          * the maximum at the end, so search the list backward.
4149          */
4150         max_id = 0;
4151         list_for_each_prev(tmp, &rbd_dev_list) {
4152                 struct rbd_device *rbd_dev;
4153
4154                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4155                 if (rbd_dev->dev_id > max_id)
4156                         max_id = rbd_dev->dev_id;
4157         }
4158         spin_unlock(&rbd_dev_list_lock);
4159
4160         /*
4161          * The max id could have been updated by rbd_dev_id_get(), in
4162          * which case it now accurately reflects the new maximum.
4163          * Be careful not to overwrite the maximum value in that
4164          * case.
4165          */
4166         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4167         dout("  max dev id has been reset\n");
4168 }
4169
4170 /*
4171  * Skips over white space at *buf, and updates *buf to point to the
4172  * first found non-space character (if any). Returns the length of
4173  * the token (string of non-white space characters) found.  Note
4174  * that *buf must be terminated with '\0'.
4175  */
4176 static inline size_t next_token(const char **buf)
4177 {
4178         /*
4179         * These are the characters that produce nonzero for
4180         * isspace() in the "C" and "POSIX" locales.
4181         */
4182         const char *spaces = " \f\n\r\t\v";
4183
4184         *buf += strspn(*buf, spaces);   /* Find start of token */
4185
4186         return strcspn(*buf, spaces);   /* Return token length */
4187 }
4188
4189 /*
4190  * Finds the next token in *buf, and if the provided token buffer is
4191  * big enough, copies the found token into it.  The result, if
4192  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4193  * must be terminated with '\0' on entry.
4194  *
4195  * Returns the length of the token found (not including the '\0').
4196  * Return value will be 0 if no token is found, and it will be >=
4197  * token_size if the token would not fit.
4198  *
4199  * The *buf pointer will be updated to point beyond the end of the
4200  * found token.  Note that this occurs even if the token buffer is
4201  * too small to hold it.
4202  */
4203 static inline size_t copy_token(const char **buf,
4204                                 char *token,
4205                                 size_t token_size)
4206 {
4207         size_t len;
4208
4209         len = next_token(buf);
4210         if (len < token_size) {
4211                 memcpy(token, *buf, len);
4212                 *(token + len) = '\0';
4213         }
4214         *buf += len;
4215
4216         return len;
4217 }
4218
4219 /*
4220  * Finds the next token in *buf, dynamically allocates a buffer big
4221  * enough to hold a copy of it, and copies the token into the new
4222  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4223  * that a duplicate buffer is created even for a zero-length token.
4224  *
4225  * Returns a pointer to the newly-allocated duplicate, or a null
4226  * pointer if memory for the duplicate was not available.  If
4227  * the lenp argument is a non-null pointer, the length of the token
4228  * (not including the '\0') is returned in *lenp.
4229  *
4230  * If successful, the *buf pointer will be updated to point beyond
4231  * the end of the found token.
4232  *
4233  * Note: uses GFP_KERNEL for allocation.
4234  */
4235 static inline char *dup_token(const char **buf, size_t *lenp)
4236 {
4237         char *dup;
4238         size_t len;
4239
4240         len = next_token(buf);
4241         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4242         if (!dup)
4243                 return NULL;
4244         *(dup + len) = '\0';
4245         *buf += len;
4246
4247         if (lenp)
4248                 *lenp = len;
4249
4250         return dup;
4251 }
4252
4253 /*
4254  * Parse the options provided for an "rbd add" (i.e., rbd image
4255  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4256  * and the data written is passed here via a NUL-terminated buffer.
4257  * Returns 0 if successful or an error code otherwise.
4258  *
4259  * The information extracted from these options is recorded in
4260  * the other parameters which return dynamically-allocated
4261  * structures:
4262  *  ceph_opts
4263  *      The address of a pointer that will refer to a ceph options
4264  *      structure.  Caller must release the returned pointer using
4265  *      ceph_destroy_options() when it is no longer needed.
4266  *  rbd_opts
4267  *      Address of an rbd options pointer.  Fully initialized by
4268  *      this function; caller must release with kfree().
4269  *  spec
4270  *      Address of an rbd image specification pointer.  Fully
4271  *      initialized by this function based on parsed options.
4272  *      Caller must release with rbd_spec_put().
4273  *
4274  * The options passed take this form:
4275  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4276  * where:
4277  *  <mon_addrs>
4278  *      A comma-separated list of one or more monitor addresses.
4279  *      A monitor address is an ip address, optionally followed
4280  *      by a port number (separated by a colon).
4281  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4282  *  <options>
4283  *      A comma-separated list of ceph and/or rbd options.
4284  *  <pool_name>
4285  *      The name of the rados pool containing the rbd image.
4286  *  <image_name>
4287  *      The name of the image in that pool to map.
4288  *  <snap_id>
4289  *      An optional snapshot id.  If provided, the mapping will
4290  *      present data from the image at the time that snapshot was
4291  *      created.  The image head is used if no snapshot id is
4292  *      provided.  Snapshot mappings are always read-only.
4293  */
4294 static int rbd_add_parse_args(const char *buf,
4295                                 struct ceph_options **ceph_opts,
4296                                 struct rbd_options **opts,
4297                                 struct rbd_spec **rbd_spec)
4298 {
4299         size_t len;
4300         char *options;
4301         const char *mon_addrs;
4302         char *snap_name;
4303         size_t mon_addrs_size;
4304         struct rbd_spec *spec = NULL;
4305         struct rbd_options *rbd_opts = NULL;
4306         struct ceph_options *copts;
4307         int ret;
4308
4309         /* The first four tokens are required */
4310
4311         len = next_token(&buf);
4312         if (!len) {
4313                 rbd_warn(NULL, "no monitor address(es) provided");
4314                 return -EINVAL;
4315         }
4316         mon_addrs = buf;
4317         mon_addrs_size = len + 1;
4318         buf += len;
4319
4320         ret = -EINVAL;
4321         options = dup_token(&buf, NULL);
4322         if (!options)
4323                 return -ENOMEM;
4324         if (!*options) {
4325                 rbd_warn(NULL, "no options provided");
4326                 goto out_err;
4327         }
4328
4329         spec = rbd_spec_alloc();
4330         if (!spec)
4331                 goto out_mem;
4332
4333         spec->pool_name = dup_token(&buf, NULL);
4334         if (!spec->pool_name)
4335                 goto out_mem;
4336         if (!*spec->pool_name) {
4337                 rbd_warn(NULL, "no pool name provided");
4338                 goto out_err;
4339         }
4340
4341         spec->image_name = dup_token(&buf, NULL);
4342         if (!spec->image_name)
4343                 goto out_mem;
4344         if (!*spec->image_name) {
4345                 rbd_warn(NULL, "no image name provided");
4346                 goto out_err;
4347         }
4348
4349         /*
4350          * Snapshot name is optional; default is to use "-"
4351          * (indicating the head/no snapshot).
4352          */
4353         len = next_token(&buf);
4354         if (!len) {
4355                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4356                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4357         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4358                 ret = -ENAMETOOLONG;
4359                 goto out_err;
4360         }
4361         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4362         if (!snap_name)
4363                 goto out_mem;
4364         *(snap_name + len) = '\0';
4365         spec->snap_name = snap_name;
4366
4367         /* Initialize all rbd options to the defaults */
4368
4369         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4370         if (!rbd_opts)
4371                 goto out_mem;
4372
4373         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4374
4375         copts = ceph_parse_options(options, mon_addrs,
4376                                         mon_addrs + mon_addrs_size - 1,
4377                                         parse_rbd_opts_token, rbd_opts);
4378         if (IS_ERR(copts)) {
4379                 ret = PTR_ERR(copts);
4380                 goto out_err;
4381         }
4382         kfree(options);
4383
4384         *ceph_opts = copts;
4385         *opts = rbd_opts;
4386         *rbd_spec = spec;
4387
4388         return 0;
4389 out_mem:
4390         ret = -ENOMEM;
4391 out_err:
4392         kfree(rbd_opts);
4393         rbd_spec_put(spec);
4394         kfree(options);
4395
4396         return ret;
4397 }
4398
4399 /*
4400  * An rbd format 2 image has a unique identifier, distinct from the
4401  * name given to it by the user.  Internally, that identifier is
4402  * what's used to specify the names of objects related to the image.
4403  *
4404  * A special "rbd id" object is used to map an rbd image name to its
4405  * id.  If that object doesn't exist, then there is no v2 rbd image
4406  * with the supplied name.
4407  *
4408  * This function will record the given rbd_dev's image_id field if
4409  * it can be determined, and in that case will return 0.  If any
4410  * errors occur a negative errno will be returned and the rbd_dev's
4411  * image_id field will be unchanged (and should be NULL).
4412  */
4413 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4414 {
4415         int ret;
4416         size_t size;
4417         char *object_name;
4418         void *response;
4419         char *image_id;
4420
4421         /*
4422          * When probing a parent image, the image id is already
4423          * known (and the image name likely is not).  There's no
4424          * need to fetch the image id again in this case.  We
4425          * do still need to set the image format though.
4426          */
4427         if (rbd_dev->spec->image_id) {
4428                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4429
4430                 return 0;
4431         }
4432
4433         /*
4434          * First, see if the format 2 image id file exists, and if
4435          * so, get the image's persistent id from it.
4436          */
4437         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4438         object_name = kmalloc(size, GFP_NOIO);
4439         if (!object_name)
4440                 return -ENOMEM;
4441         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4442         dout("rbd id object name is %s\n", object_name);
4443
4444         /* Response will be an encoded string, which includes a length */
4445
4446         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4447         response = kzalloc(size, GFP_NOIO);
4448         if (!response) {
4449                 ret = -ENOMEM;
4450                 goto out;
4451         }
4452
4453         /* If it doesn't exist we'll assume it's a format 1 image */
4454
4455         ret = rbd_obj_method_sync(rbd_dev, object_name,
4456                                 "rbd", "get_id", NULL, 0,
4457                                 response, RBD_IMAGE_ID_LEN_MAX);
4458         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4459         if (ret == -ENOENT) {
4460                 image_id = kstrdup("", GFP_KERNEL);
4461                 ret = image_id ? 0 : -ENOMEM;
4462                 if (!ret)
4463                         rbd_dev->image_format = 1;
4464         } else if (ret > sizeof (__le32)) {
4465                 void *p = response;
4466
4467                 image_id = ceph_extract_encoded_string(&p, p + ret,
4468                                                 NULL, GFP_NOIO);
4469                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4470                 if (!ret)
4471                         rbd_dev->image_format = 2;
4472         } else {
4473                 ret = -EINVAL;
4474         }
4475
4476         if (!ret) {
4477                 rbd_dev->spec->image_id = image_id;
4478                 dout("image_id is %s\n", image_id);
4479         }
4480 out:
4481         kfree(response);
4482         kfree(object_name);
4483
4484         return ret;
4485 }
4486
4487 /* Undo whatever state changes are made by v1 or v2 image probe */
4488
4489 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4490 {
4491         struct rbd_image_header *header;
4492
4493         rbd_dev_remove_parent(rbd_dev);
4494         rbd_spec_put(rbd_dev->parent_spec);
4495         rbd_dev->parent_spec = NULL;
4496         rbd_dev->parent_overlap = 0;
4497
4498         /* Free dynamic fields from the header, then zero it out */
4499
4500         header = &rbd_dev->header;
4501         ceph_put_snap_context(header->snapc);
4502         kfree(header->snap_sizes);
4503         kfree(header->snap_names);
4504         kfree(header->object_prefix);
4505         memset(header, 0, sizeof (*header));
4506 }
4507
4508 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4509 {
4510         int ret;
4511
4512         /* Populate rbd image metadata */
4513
4514         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4515         if (ret < 0)
4516                 goto out_err;
4517
4518         /* Version 1 images have no parent (no layering) */
4519
4520         rbd_dev->parent_spec = NULL;
4521         rbd_dev->parent_overlap = 0;
4522
4523         dout("discovered version 1 image, header name is %s\n",
4524                 rbd_dev->header_name);
4525
4526         return 0;
4527
4528 out_err:
4529         kfree(rbd_dev->header_name);
4530         rbd_dev->header_name = NULL;
4531         kfree(rbd_dev->spec->image_id);
4532         rbd_dev->spec->image_id = NULL;
4533
4534         return ret;
4535 }
4536
4537 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4538 {
4539         int ret;
4540
4541         ret = rbd_dev_v2_image_size(rbd_dev);
4542         if (ret)
4543                 goto out_err;
4544
4545         /* Get the object prefix (a.k.a. block_name) for the image */
4546
4547         ret = rbd_dev_v2_object_prefix(rbd_dev);
4548         if (ret)
4549                 goto out_err;
4550
4551         /* Get the and check features for the image */
4552
4553         ret = rbd_dev_v2_features(rbd_dev);
4554         if (ret)
4555                 goto out_err;
4556
4557         /* If the image supports layering, get the parent info */
4558
4559         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4560                 ret = rbd_dev_v2_parent_info(rbd_dev);
4561                 if (ret)
4562                         goto out_err;
4563
4564                 /*
4565                  * Don't print a warning for parent images.  We can
4566                  * tell this point because we won't know its pool
4567                  * name yet (just its pool id).
4568                  */
4569                 if (rbd_dev->spec->pool_name)
4570                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4571                                         "is EXPERIMENTAL!");
4572         }
4573
4574         /* If the image supports fancy striping, get its parameters */
4575
4576         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4577                 ret = rbd_dev_v2_striping_info(rbd_dev);
4578                 if (ret < 0)
4579                         goto out_err;
4580         }
4581
4582         /* crypto and compression type aren't (yet) supported for v2 images */
4583
4584         rbd_dev->header.crypt_type = 0;
4585         rbd_dev->header.comp_type = 0;
4586
4587         /* Get the snapshot context, plus the header version */
4588
4589         ret = rbd_dev_v2_snap_context(rbd_dev);
4590         if (ret)
4591                 goto out_err;
4592
4593         dout("discovered version 2 image, header name is %s\n",
4594                 rbd_dev->header_name);
4595
4596         return 0;
4597 out_err:
4598         rbd_dev->parent_overlap = 0;
4599         rbd_spec_put(rbd_dev->parent_spec);
4600         rbd_dev->parent_spec = NULL;
4601         kfree(rbd_dev->header_name);
4602         rbd_dev->header_name = NULL;
4603         kfree(rbd_dev->header.object_prefix);
4604         rbd_dev->header.object_prefix = NULL;
4605
4606         return ret;
4607 }
4608
4609 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4610 {
4611         struct rbd_device *parent = NULL;
4612         struct rbd_spec *parent_spec;
4613         struct rbd_client *rbdc;
4614         int ret;
4615
4616         if (!rbd_dev->parent_spec)
4617                 return 0;
4618         /*
4619          * We need to pass a reference to the client and the parent
4620          * spec when creating the parent rbd_dev.  Images related by
4621          * parent/child relationships always share both.
4622          */
4623         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4624         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4625
4626         ret = -ENOMEM;
4627         parent = rbd_dev_create(rbdc, parent_spec);
4628         if (!parent)
4629                 goto out_err;
4630
4631         ret = rbd_dev_image_probe(parent);
4632         if (ret < 0)
4633                 goto out_err;
4634         rbd_dev->parent = parent;
4635
4636         return 0;
4637 out_err:
4638         if (parent) {
4639                 rbd_spec_put(rbd_dev->parent_spec);
4640                 kfree(rbd_dev->header_name);
4641                 rbd_dev_destroy(parent);
4642         } else {
4643                 rbd_put_client(rbdc);
4644                 rbd_spec_put(parent_spec);
4645         }
4646
4647         return ret;
4648 }
4649
4650 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4651 {
4652         int ret;
4653
4654         ret = rbd_dev_mapping_set(rbd_dev);
4655         if (ret)
4656                 return ret;
4657
4658         /* generate unique id: find highest unique id, add one */
4659         rbd_dev_id_get(rbd_dev);
4660
4661         /* Fill in the device name, now that we have its id. */
4662         BUILD_BUG_ON(DEV_NAME_LEN
4663                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4664         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4665
4666         /* Get our block major device number. */
4667
4668         ret = register_blkdev(0, rbd_dev->name);
4669         if (ret < 0)
4670                 goto err_out_id;
4671         rbd_dev->major = ret;
4672
4673         /* Set up the blkdev mapping. */
4674
4675         ret = rbd_init_disk(rbd_dev);
4676         if (ret)
4677                 goto err_out_blkdev;
4678
4679         ret = rbd_bus_add_dev(rbd_dev);
4680         if (ret)
4681                 goto err_out_disk;
4682
4683         /* Everything's ready.  Announce the disk to the world. */
4684
4685         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4686         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4687         add_disk(rbd_dev->disk);
4688
4689         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4690                 (unsigned long long) rbd_dev->mapping.size);
4691
4692         return ret;
4693
4694 err_out_disk:
4695         rbd_free_disk(rbd_dev);
4696 err_out_blkdev:
4697         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4698 err_out_id:
4699         rbd_dev_id_put(rbd_dev);
4700         rbd_dev_mapping_clear(rbd_dev);
4701
4702         return ret;
4703 }
4704
4705 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4706 {
4707         struct rbd_spec *spec = rbd_dev->spec;
4708         size_t size;
4709
4710         /* Record the header object name for this rbd image. */
4711
4712         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4713
4714         if (rbd_dev->image_format == 1)
4715                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4716         else
4717                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4718
4719         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4720         if (!rbd_dev->header_name)
4721                 return -ENOMEM;
4722
4723         if (rbd_dev->image_format == 1)
4724                 sprintf(rbd_dev->header_name, "%s%s",
4725                         spec->image_name, RBD_SUFFIX);
4726         else
4727                 sprintf(rbd_dev->header_name, "%s%s",
4728                         RBD_HEADER_PREFIX, spec->image_id);
4729         return 0;
4730 }
4731
4732 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4733 {
4734         int ret;
4735
4736         rbd_dev_unprobe(rbd_dev);
4737         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4738         if (ret)
4739                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4740         kfree(rbd_dev->header_name);
4741         rbd_dev->header_name = NULL;
4742         rbd_dev->image_format = 0;
4743         kfree(rbd_dev->spec->image_id);
4744         rbd_dev->spec->image_id = NULL;
4745
4746         rbd_dev_destroy(rbd_dev);
4747 }
4748
4749 /*
4750  * Probe for the existence of the header object for the given rbd
4751  * device.  For format 2 images this includes determining the image
4752  * id.
4753  */
4754 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4755 {
4756         int ret;
4757         int tmp;
4758
4759         /*
4760          * Get the id from the image id object.  If it's not a
4761          * format 2 image, we'll get ENOENT back, and we'll assume
4762          * it's a format 1 image.
4763          */
4764         ret = rbd_dev_image_id(rbd_dev);
4765         if (ret)
4766                 return ret;
4767         rbd_assert(rbd_dev->spec->image_id);
4768         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4769
4770         ret = rbd_dev_header_name(rbd_dev);
4771         if (ret)
4772                 goto err_out_format;
4773
4774         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4775         if (ret)
4776                 goto out_header_name;
4777
4778         if (rbd_dev->image_format == 1)
4779                 ret = rbd_dev_v1_probe(rbd_dev);
4780         else
4781                 ret = rbd_dev_v2_probe(rbd_dev);
4782         if (ret)
4783                 goto err_out_watch;
4784
4785         ret = rbd_dev_spec_update(rbd_dev);
4786         if (ret)
4787                 goto err_out_probe;
4788
4789         ret = rbd_dev_probe_parent(rbd_dev);
4790         if (!ret)
4791                 return 0;
4792
4793 err_out_probe:
4794         rbd_dev_unprobe(rbd_dev);
4795 err_out_watch:
4796         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4797         if (tmp)
4798                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4799 out_header_name:
4800         kfree(rbd_dev->header_name);
4801         rbd_dev->header_name = NULL;
4802 err_out_format:
4803         rbd_dev->image_format = 0;
4804         kfree(rbd_dev->spec->image_id);
4805         rbd_dev->spec->image_id = NULL;
4806
4807         dout("probe failed, returning %d\n", ret);
4808
4809         return ret;
4810 }
4811
4812 static ssize_t rbd_add(struct bus_type *bus,
4813                        const char *buf,
4814                        size_t count)
4815 {
4816         struct rbd_device *rbd_dev = NULL;
4817         struct ceph_options *ceph_opts = NULL;
4818         struct rbd_options *rbd_opts = NULL;
4819         struct rbd_spec *spec = NULL;
4820         struct rbd_client *rbdc;
4821         struct ceph_osd_client *osdc;
4822         int rc = -ENOMEM;
4823
4824         if (!try_module_get(THIS_MODULE))
4825                 return -ENODEV;
4826
4827         /* parse add command */
4828         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4829         if (rc < 0)
4830                 goto err_out_module;
4831
4832         rbdc = rbd_get_client(ceph_opts);
4833         if (IS_ERR(rbdc)) {
4834                 rc = PTR_ERR(rbdc);
4835                 goto err_out_args;
4836         }
4837         ceph_opts = NULL;       /* rbd_dev client now owns this */
4838
4839         /* pick the pool */
4840         osdc = &rbdc->client->osdc;
4841         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4842         if (rc < 0)
4843                 goto err_out_client;
4844         spec->pool_id = (u64)rc;
4845
4846         /* The ceph file layout needs to fit pool id in 32 bits */
4847
4848         if (spec->pool_id > (u64)U32_MAX) {
4849                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4850                                 (unsigned long long)spec->pool_id, U32_MAX);
4851                 rc = -EIO;
4852                 goto err_out_client;
4853         }
4854
4855         rbd_dev = rbd_dev_create(rbdc, spec);
4856         if (!rbd_dev)
4857                 goto err_out_client;
4858         rbdc = NULL;            /* rbd_dev now owns this */
4859         spec = NULL;            /* rbd_dev now owns this */
4860
4861         rbd_dev->mapping.read_only = rbd_opts->read_only;
4862         kfree(rbd_opts);
4863         rbd_opts = NULL;        /* done with this */
4864
4865         rc = rbd_dev_image_probe(rbd_dev);
4866         if (rc < 0)
4867                 goto err_out_rbd_dev;
4868
4869         rc = rbd_dev_device_setup(rbd_dev);
4870         if (!rc)
4871                 return count;
4872
4873         rbd_dev_image_release(rbd_dev);
4874 err_out_rbd_dev:
4875         rbd_dev_destroy(rbd_dev);
4876 err_out_client:
4877         rbd_put_client(rbdc);
4878 err_out_args:
4879         if (ceph_opts)
4880                 ceph_destroy_options(ceph_opts);
4881         kfree(rbd_opts);
4882         rbd_spec_put(spec);
4883 err_out_module:
4884         module_put(THIS_MODULE);
4885
4886         dout("Error adding device %s\n", buf);
4887
4888         return (ssize_t)rc;
4889 }
4890
4891 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4892 {
4893         struct list_head *tmp;
4894         struct rbd_device *rbd_dev;
4895
4896         spin_lock(&rbd_dev_list_lock);
4897         list_for_each(tmp, &rbd_dev_list) {
4898                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4899                 if (rbd_dev->dev_id == dev_id) {
4900                         spin_unlock(&rbd_dev_list_lock);
4901                         return rbd_dev;
4902                 }
4903         }
4904         spin_unlock(&rbd_dev_list_lock);
4905         return NULL;
4906 }
4907
4908 static void rbd_dev_device_release(struct device *dev)
4909 {
4910         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4911
4912         rbd_free_disk(rbd_dev);
4913         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4914         rbd_dev_clear_mapping(rbd_dev);
4915         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4916         rbd_dev->major = 0;
4917         rbd_dev_id_put(rbd_dev);
4918         rbd_dev_mapping_clear(rbd_dev);
4919 }
4920
4921 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4922 {
4923         while (rbd_dev->parent) {
4924                 struct rbd_device *first = rbd_dev;
4925                 struct rbd_device *second = first->parent;
4926                 struct rbd_device *third;
4927
4928                 /*
4929                  * Follow to the parent with no grandparent and
4930                  * remove it.
4931                  */
4932                 while (second && (third = second->parent)) {
4933                         first = second;
4934                         second = third;
4935                 }
4936                 rbd_assert(second);
4937                 rbd_dev_image_release(second);
4938                 first->parent = NULL;
4939                 first->parent_overlap = 0;
4940
4941                 rbd_assert(first->parent_spec);
4942                 rbd_spec_put(first->parent_spec);
4943                 first->parent_spec = NULL;
4944         }
4945 }
4946
4947 static ssize_t rbd_remove(struct bus_type *bus,
4948                           const char *buf,
4949                           size_t count)
4950 {
4951         struct rbd_device *rbd_dev = NULL;
4952         int target_id;
4953         unsigned long ul;
4954         int ret;
4955
4956         ret = strict_strtoul(buf, 10, &ul);
4957         if (ret)
4958                 return ret;
4959
4960         /* convert to int; abort if we lost anything in the conversion */
4961         target_id = (int) ul;
4962         if (target_id != ul)
4963                 return -EINVAL;
4964
4965         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4966
4967         rbd_dev = __rbd_get_dev(target_id);
4968         if (!rbd_dev) {
4969                 ret = -ENOENT;
4970                 goto done;
4971         }
4972
4973         spin_lock_irq(&rbd_dev->lock);
4974         if (rbd_dev->open_count)
4975                 ret = -EBUSY;
4976         else
4977                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4978         spin_unlock_irq(&rbd_dev->lock);
4979         if (ret < 0)
4980                 goto done;
4981         ret = count;
4982         rbd_bus_del_dev(rbd_dev);
4983         rbd_dev_image_release(rbd_dev);
4984         module_put(THIS_MODULE);
4985 done:
4986         mutex_unlock(&ctl_mutex);
4987
4988         return ret;
4989 }
4990
4991 /*
4992  * create control files in sysfs
4993  * /sys/bus/rbd/...
4994  */
4995 static int rbd_sysfs_init(void)
4996 {
4997         int ret;
4998
4999         ret = device_register(&rbd_root_dev);
5000         if (ret < 0)
5001                 return ret;
5002
5003         ret = bus_register(&rbd_bus_type);
5004         if (ret < 0)
5005                 device_unregister(&rbd_root_dev);
5006
5007         return ret;
5008 }
5009
5010 static void rbd_sysfs_cleanup(void)
5011 {
5012         bus_unregister(&rbd_bus_type);
5013         device_unregister(&rbd_root_dev);
5014 }
5015
5016 static int rbd_slab_init(void)
5017 {
5018         rbd_assert(!rbd_img_request_cache);
5019         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5020                                         sizeof (struct rbd_img_request),
5021                                         __alignof__(struct rbd_img_request),
5022                                         0, NULL);
5023         if (!rbd_img_request_cache)
5024                 return -ENOMEM;
5025
5026         rbd_assert(!rbd_obj_request_cache);
5027         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5028                                         sizeof (struct rbd_obj_request),
5029                                         __alignof__(struct rbd_obj_request),
5030                                         0, NULL);
5031         if (!rbd_obj_request_cache)
5032                 goto out_err;
5033
5034         rbd_assert(!rbd_segment_name_cache);
5035         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5036                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5037         if (rbd_segment_name_cache)
5038                 return 0;
5039 out_err:
5040         if (rbd_obj_request_cache) {
5041                 kmem_cache_destroy(rbd_obj_request_cache);
5042                 rbd_obj_request_cache = NULL;
5043         }
5044
5045         kmem_cache_destroy(rbd_img_request_cache);
5046         rbd_img_request_cache = NULL;
5047
5048         return -ENOMEM;
5049 }
5050
5051 static void rbd_slab_exit(void)
5052 {
5053         rbd_assert(rbd_segment_name_cache);
5054         kmem_cache_destroy(rbd_segment_name_cache);
5055         rbd_segment_name_cache = NULL;
5056
5057         rbd_assert(rbd_obj_request_cache);
5058         kmem_cache_destroy(rbd_obj_request_cache);
5059         rbd_obj_request_cache = NULL;
5060
5061         rbd_assert(rbd_img_request_cache);
5062         kmem_cache_destroy(rbd_img_request_cache);
5063         rbd_img_request_cache = NULL;
5064 }
5065
5066 static int __init rbd_init(void)
5067 {
5068         int rc;
5069
5070         if (!libceph_compatible(NULL)) {
5071                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5072
5073                 return -EINVAL;
5074         }
5075         rc = rbd_slab_init();
5076         if (rc)
5077                 return rc;
5078         rc = rbd_sysfs_init();
5079         if (rc)
5080                 rbd_slab_exit();
5081         else
5082                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5083
5084         return rc;
5085 }
5086
5087 static void __exit rbd_exit(void)
5088 {
5089         rbd_sysfs_cleanup();
5090         rbd_slab_exit();
5091 }
5092
5093 module_init(rbd_init);
5094 module_exit(rbd_exit);
5095
5096 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5097 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5098 MODULE_DESCRIPTION("rados block device");
5099
5100 /* following authorship retained from original osdblk.c */
5101 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5102
5103 MODULE_LICENSE("GPL");