]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: don't release write request until necessary
[karo-tx-linux.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG       /* Activate rbd_assert() calls */
48
49 /*
50  * The basic unit of block I/O is a sector.  It is interpreted in a
51  * number of contexts in Linux (blk, bio, genhd), but the default is
52  * universally 512 bytes.  These symbols are just slightly more
53  * meaningful than the bare numbers they represent.
54  */
55 #define SECTOR_SHIFT    9
56 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
57
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
60
61 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
62
63 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN   \
65                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
67 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
68
69 #define RBD_SNAP_HEAD_NAME      "-"
70
71 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING    (1<<0)
82 #define RBD_FEATURE_STRIPINGV2  (1<<1)
83 #define RBD_FEATURES_ALL \
84             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
85
86 /* Features supported by this (client software) implementation. */
87
88 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
89
90 /*
91  * An RBD device name will be "rbd#", where the "rbd" comes from
92  * RBD_DRV_NAME above, and # is a unique integer identifier.
93  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94  * enough to hold all possible device names.
95  */
96 #define DEV_NAME_LEN            32
97 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
98
99 /*
100  * block device image metadata (in-memory version)
101  */
102 struct rbd_image_header {
103         /* These six fields never change for a given rbd image */
104         char *object_prefix;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108         u64 stripe_unit;
109         u64 stripe_count;
110         u64 features;           /* Might be changeable someday? */
111
112         /* The remaining fields need to be updated occasionally */
113         u64 image_size;
114         struct ceph_snap_context *snapc;
115         char *snap_names;       /* format 1 only */
116         u64 *snap_sizes;        /* format 1 only */
117 };
118
119 /*
120  * An rbd image specification.
121  *
122  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123  * identify an image.  Each rbd_dev structure includes a pointer to
124  * an rbd_spec structure that encapsulates this identity.
125  *
126  * Each of the id's in an rbd_spec has an associated name.  For a
127  * user-mapped image, the names are supplied and the id's associated
128  * with them are looked up.  For a layered image, a parent image is
129  * defined by the tuple, and the names are looked up.
130  *
131  * An rbd_dev structure contains a parent_spec pointer which is
132  * non-null if the image it represents is a child in a layered
133  * image.  This pointer will refer to the rbd_spec structure used
134  * by the parent rbd_dev for its own identity (i.e., the structure
135  * is shared between the parent and child).
136  *
137  * Since these structures are populated once, during the discovery
138  * phase of image construction, they are effectively immutable so
139  * we make no effort to synchronize access to them.
140  *
141  * Note that code herein does not assume the image name is known (it
142  * could be a null pointer).
143  */
144 struct rbd_spec {
145         u64             pool_id;
146         const char      *pool_name;
147
148         const char      *image_id;
149         const char      *image_name;
150
151         u64             snap_id;
152         const char      *snap_name;
153
154         struct kref     kref;
155 };
156
157 /*
158  * an instance of the client.  multiple devices may share an rbd client.
159  */
160 struct rbd_client {
161         struct ceph_client      *client;
162         struct kref             kref;
163         struct list_head        node;
164 };
165
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
170
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
174 enum obj_request_type {
175         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 };
177
178 enum obj_req_flags {
179         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
180         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
181         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
182         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
183 };
184
185 struct rbd_obj_request {
186         const char              *object_name;
187         u64                     offset;         /* object start byte */
188         u64                     length;         /* bytes from offset */
189         unsigned long           flags;
190
191         /*
192          * An object request associated with an image will have its
193          * img_data flag set; a standalone object request will not.
194          *
195          * A standalone object request will have which == BAD_WHICH
196          * and a null obj_request pointer.
197          *
198          * An object request initiated in support of a layered image
199          * object (to check for its existence before a write) will
200          * have which == BAD_WHICH and a non-null obj_request pointer.
201          *
202          * Finally, an object request for rbd image data will have
203          * which != BAD_WHICH, and will have a non-null img_request
204          * pointer.  The value of which will be in the range
205          * 0..(img_request->obj_request_count-1).
206          */
207         union {
208                 struct rbd_obj_request  *obj_request;   /* STAT op */
209                 struct {
210                         struct rbd_img_request  *img_request;
211                         u64                     img_offset;
212                         /* links for img_request->obj_requests list */
213                         struct list_head        links;
214                 };
215         };
216         u32                     which;          /* posn image request list */
217
218         enum obj_request_type   type;
219         union {
220                 struct bio      *bio_list;
221                 struct {
222                         struct page     **pages;
223                         u32             page_count;
224                 };
225         };
226         struct page             **copyup_pages;
227         u32                     copyup_page_count;
228
229         struct ceph_osd_request *osd_req;
230
231         u64                     xferred;        /* bytes transferred */
232         int                     result;
233
234         rbd_obj_callback_t      callback;
235         struct completion       completion;
236
237         struct kref             kref;
238 };
239
240 enum img_req_flags {
241         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
242         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
243         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
244 };
245
246 struct rbd_img_request {
247         struct rbd_device       *rbd_dev;
248         u64                     offset; /* starting image byte offset */
249         u64                     length; /* byte count from offset */
250         unsigned long           flags;
251         union {
252                 u64                     snap_id;        /* for reads */
253                 struct ceph_snap_context *snapc;        /* for writes */
254         };
255         union {
256                 struct request          *rq;            /* block request */
257                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
258         };
259         struct page             **copyup_pages;
260         u32                     copyup_page_count;
261         spinlock_t              completion_lock;/* protects next_completion */
262         u32                     next_completion;
263         rbd_img_callback_t      callback;
264         u64                     xferred;/* aggregate bytes transferred */
265         int                     result; /* first nonzero obj_request result */
266
267         u32                     obj_request_count;
268         struct list_head        obj_requests;   /* rbd_obj_request structs */
269
270         struct kref             kref;
271 };
272
273 #define for_each_obj_request(ireq, oreq) \
274         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_from(ireq, oreq) \
276         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
277 #define for_each_obj_request_safe(ireq, oreq, n) \
278         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
279
280 struct rbd_mapping {
281         u64                     size;
282         u64                     features;
283         bool                    read_only;
284 };
285
286 /*
287  * a single device
288  */
289 struct rbd_device {
290         int                     dev_id;         /* blkdev unique id */
291
292         int                     major;          /* blkdev assigned major */
293         struct gendisk          *disk;          /* blkdev's gendisk and rq */
294
295         u32                     image_format;   /* Either 1 or 2 */
296         struct rbd_client       *rbd_client;
297
298         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
299
300         spinlock_t              lock;           /* queue, flags, open_count */
301
302         struct rbd_image_header header;
303         unsigned long           flags;          /* possibly lock protected */
304         struct rbd_spec         *spec;
305
306         char                    *header_name;
307
308         struct ceph_file_layout layout;
309
310         struct ceph_osd_event   *watch_event;
311         struct rbd_obj_request  *watch_request;
312
313         struct rbd_spec         *parent_spec;
314         u64                     parent_overlap;
315         struct rbd_device       *parent;
316
317         /* protects updating the header */
318         struct rw_semaphore     header_rwsem;
319
320         struct rbd_mapping      mapping;
321
322         struct list_head        node;
323
324         /* sysfs related */
325         struct device           dev;
326         unsigned long           open_count;     /* protected by lock */
327 };
328
329 /*
330  * Flag bits for rbd_dev->flags.  If atomicity is required,
331  * rbd_dev->lock is used to protect access.
332  *
333  * Currently, only the "removing" flag (which is coupled with the
334  * "open_count" field) requires atomic access.
335  */
336 enum rbd_dev_flags {
337         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
338         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
339 };
340
341 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
342
343 static LIST_HEAD(rbd_dev_list);    /* devices */
344 static DEFINE_SPINLOCK(rbd_dev_list_lock);
345
346 static LIST_HEAD(rbd_client_list);              /* clients */
347 static DEFINE_SPINLOCK(rbd_client_list_lock);
348
349 /* Slab caches for frequently-allocated structures */
350
351 static struct kmem_cache        *rbd_img_request_cache;
352 static struct kmem_cache        *rbd_obj_request_cache;
353 static struct kmem_cache        *rbd_segment_name_cache;
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static void rbd_dev_device_release(struct device *dev);
358
359 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360                        size_t count);
361 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362                           size_t count);
363 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
364
365 static struct bus_attribute rbd_bus_attrs[] = {
366         __ATTR(add, S_IWUSR, NULL, rbd_add),
367         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
368         __ATTR_NULL
369 };
370
371 static struct bus_type rbd_bus_type = {
372         .name           = "rbd",
373         .bus_attrs      = rbd_bus_attrs,
374 };
375
376 static void rbd_root_dev_release(struct device *dev)
377 {
378 }
379
380 static struct device rbd_root_dev = {
381         .init_name =    "rbd",
382         .release =      rbd_root_dev_release,
383 };
384
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 {
388         struct va_format vaf;
389         va_list args;
390
391         va_start(args, fmt);
392         vaf.fmt = fmt;
393         vaf.va = &args;
394
395         if (!rbd_dev)
396                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
397         else if (rbd_dev->disk)
398                 printk(KERN_WARNING "%s: %s: %pV\n",
399                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
400         else if (rbd_dev->spec && rbd_dev->spec->image_name)
401                 printk(KERN_WARNING "%s: image %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_id)
404                 printk(KERN_WARNING "%s: id %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406         else    /* punt */
407                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
408                         RBD_DRV_NAME, rbd_dev, &vaf);
409         va_end(args);
410 }
411
412 #ifdef RBD_DEBUG
413 #define rbd_assert(expr)                                                \
414                 if (unlikely(!(expr))) {                                \
415                         printk(KERN_ERR "\nAssertion failure in %s() "  \
416                                                 "at line %d:\n\n"       \
417                                         "\trbd_assert(%s);\n\n",        \
418                                         __func__, __LINE__, #expr);     \
419                         BUG();                                          \
420                 }
421 #else /* !RBD_DEBUG */
422 #  define rbd_assert(expr)      ((void) 0)
423 #endif /* !RBD_DEBUG */
424
425 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
426 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
427 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
428
429 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
430 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
431 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
432 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
433                                         u64 snap_id);
434 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
435                                 u8 *order, u64 *snap_size);
436 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
437                 u64 *snap_features);
438 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
439
440 static int rbd_open(struct block_device *bdev, fmode_t mode)
441 {
442         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
443         bool removing = false;
444
445         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446                 return -EROFS;
447
448         spin_lock_irq(&rbd_dev->lock);
449         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450                 removing = true;
451         else
452                 rbd_dev->open_count++;
453         spin_unlock_irq(&rbd_dev->lock);
454         if (removing)
455                 return -ENOENT;
456
457         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
458         (void) get_device(&rbd_dev->dev);
459         set_device_ro(bdev, rbd_dev->mapping.read_only);
460         mutex_unlock(&ctl_mutex);
461
462         return 0;
463 }
464
465 static int rbd_release(struct gendisk *disk, fmode_t mode)
466 {
467         struct rbd_device *rbd_dev = disk->private_data;
468         unsigned long open_count_before;
469
470         spin_lock_irq(&rbd_dev->lock);
471         open_count_before = rbd_dev->open_count--;
472         spin_unlock_irq(&rbd_dev->lock);
473         rbd_assert(open_count_before > 0);
474
475         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
476         put_device(&rbd_dev->dev);
477         mutex_unlock(&ctl_mutex);
478
479         return 0;
480 }
481
482 static const struct block_device_operations rbd_bd_ops = {
483         .owner                  = THIS_MODULE,
484         .open                   = rbd_open,
485         .release                = rbd_release,
486 };
487
488 /*
489  * Initialize an rbd client instance.
490  * We own *ceph_opts.
491  */
492 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
493 {
494         struct rbd_client *rbdc;
495         int ret = -ENOMEM;
496
497         dout("%s:\n", __func__);
498         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
499         if (!rbdc)
500                 goto out_opt;
501
502         kref_init(&rbdc->kref);
503         INIT_LIST_HEAD(&rbdc->node);
504
505         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
506
507         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
508         if (IS_ERR(rbdc->client))
509                 goto out_mutex;
510         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
511
512         ret = ceph_open_session(rbdc->client);
513         if (ret < 0)
514                 goto out_err;
515
516         spin_lock(&rbd_client_list_lock);
517         list_add_tail(&rbdc->node, &rbd_client_list);
518         spin_unlock(&rbd_client_list_lock);
519
520         mutex_unlock(&ctl_mutex);
521         dout("%s: rbdc %p\n", __func__, rbdc);
522
523         return rbdc;
524
525 out_err:
526         ceph_destroy_client(rbdc->client);
527 out_mutex:
528         mutex_unlock(&ctl_mutex);
529         kfree(rbdc);
530 out_opt:
531         if (ceph_opts)
532                 ceph_destroy_options(ceph_opts);
533         dout("%s: error %d\n", __func__, ret);
534
535         return ERR_PTR(ret);
536 }
537
538 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
539 {
540         kref_get(&rbdc->kref);
541
542         return rbdc;
543 }
544
545 /*
546  * Find a ceph client with specific addr and configuration.  If
547  * found, bump its reference count.
548  */
549 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
550 {
551         struct rbd_client *client_node;
552         bool found = false;
553
554         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555                 return NULL;
556
557         spin_lock(&rbd_client_list_lock);
558         list_for_each_entry(client_node, &rbd_client_list, node) {
559                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
560                         __rbd_get_client(client_node);
561
562                         found = true;
563                         break;
564                 }
565         }
566         spin_unlock(&rbd_client_list_lock);
567
568         return found ? client_node : NULL;
569 }
570
571 /*
572  * mount options
573  */
574 enum {
575         Opt_last_int,
576         /* int args above */
577         Opt_last_string,
578         /* string args above */
579         Opt_read_only,
580         Opt_read_write,
581         /* Boolean args above */
582         Opt_last_bool,
583 };
584
585 static match_table_t rbd_opts_tokens = {
586         /* int args above */
587         /* string args above */
588         {Opt_read_only, "read_only"},
589         {Opt_read_only, "ro"},          /* Alternate spelling */
590         {Opt_read_write, "read_write"},
591         {Opt_read_write, "rw"},         /* Alternate spelling */
592         /* Boolean args above */
593         {-1, NULL}
594 };
595
596 struct rbd_options {
597         bool    read_only;
598 };
599
600 #define RBD_READ_ONLY_DEFAULT   false
601
602 static int parse_rbd_opts_token(char *c, void *private)
603 {
604         struct rbd_options *rbd_opts = private;
605         substring_t argstr[MAX_OPT_ARGS];
606         int token, intval, ret;
607
608         token = match_token(c, rbd_opts_tokens, argstr);
609         if (token < 0)
610                 return -EINVAL;
611
612         if (token < Opt_last_int) {
613                 ret = match_int(&argstr[0], &intval);
614                 if (ret < 0) {
615                         pr_err("bad mount option arg (not int) "
616                                "at '%s'\n", c);
617                         return ret;
618                 }
619                 dout("got int token %d val %d\n", token, intval);
620         } else if (token > Opt_last_int && token < Opt_last_string) {
621                 dout("got string token %d val %s\n", token,
622                      argstr[0].from);
623         } else if (token > Opt_last_string && token < Opt_last_bool) {
624                 dout("got Boolean token %d\n", token);
625         } else {
626                 dout("got token %d\n", token);
627         }
628
629         switch (token) {
630         case Opt_read_only:
631                 rbd_opts->read_only = true;
632                 break;
633         case Opt_read_write:
634                 rbd_opts->read_only = false;
635                 break;
636         default:
637                 rbd_assert(false);
638                 break;
639         }
640         return 0;
641 }
642
643 /*
644  * Get a ceph client with specific addr and configuration, if one does
645  * not exist create it.
646  */
647 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
648 {
649         struct rbd_client *rbdc;
650
651         rbdc = rbd_client_find(ceph_opts);
652         if (rbdc)       /* using an existing client */
653                 ceph_destroy_options(ceph_opts);
654         else
655                 rbdc = rbd_client_create(ceph_opts);
656
657         return rbdc;
658 }
659
660 /*
661  * Destroy ceph client
662  *
663  * Caller must hold rbd_client_list_lock.
664  */
665 static void rbd_client_release(struct kref *kref)
666 {
667         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
668
669         dout("%s: rbdc %p\n", __func__, rbdc);
670         spin_lock(&rbd_client_list_lock);
671         list_del(&rbdc->node);
672         spin_unlock(&rbd_client_list_lock);
673
674         ceph_destroy_client(rbdc->client);
675         kfree(rbdc);
676 }
677
678 /*
679  * Drop reference to ceph client node. If it's not referenced anymore, release
680  * it.
681  */
682 static void rbd_put_client(struct rbd_client *rbdc)
683 {
684         if (rbdc)
685                 kref_put(&rbdc->kref, rbd_client_release);
686 }
687
688 static bool rbd_image_format_valid(u32 image_format)
689 {
690         return image_format == 1 || image_format == 2;
691 }
692
693 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
694 {
695         size_t size;
696         u32 snap_count;
697
698         /* The header has to start with the magic rbd header text */
699         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700                 return false;
701
702         /* The bio layer requires at least sector-sized I/O */
703
704         if (ondisk->options.order < SECTOR_SHIFT)
705                 return false;
706
707         /* If we use u64 in a few spots we may be able to loosen this */
708
709         if (ondisk->options.order > 8 * sizeof (int) - 1)
710                 return false;
711
712         /*
713          * The size of a snapshot header has to fit in a size_t, and
714          * that limits the number of snapshots.
715          */
716         snap_count = le32_to_cpu(ondisk->snap_count);
717         size = SIZE_MAX - sizeof (struct ceph_snap_context);
718         if (snap_count > size / sizeof (__le64))
719                 return false;
720
721         /*
722          * Not only that, but the size of the entire the snapshot
723          * header must also be representable in a size_t.
724          */
725         size -= snap_count * sizeof (__le64);
726         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
727                 return false;
728
729         return true;
730 }
731
732 /*
733  * Fill an rbd image header with information from the given format 1
734  * on-disk header.
735  */
736 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
737                                  struct rbd_image_header_ondisk *ondisk)
738 {
739         struct rbd_image_header *header = &rbd_dev->header;
740         bool first_time = header->object_prefix == NULL;
741         struct ceph_snap_context *snapc;
742         char *object_prefix = NULL;
743         char *snap_names = NULL;
744         u64 *snap_sizes = NULL;
745         u32 snap_count;
746         size_t size;
747         int ret = -ENOMEM;
748         u32 i;
749
750         /* Allocate this now to avoid having to handle failure below */
751
752         if (first_time) {
753                 size_t len;
754
755                 len = strnlen(ondisk->object_prefix,
756                                 sizeof (ondisk->object_prefix));
757                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
758                 if (!object_prefix)
759                         return -ENOMEM;
760                 memcpy(object_prefix, ondisk->object_prefix, len);
761                 object_prefix[len] = '\0';
762         }
763
764         /* Allocate the snapshot context and fill it in */
765
766         snap_count = le32_to_cpu(ondisk->snap_count);
767         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
768         if (!snapc)
769                 goto out_err;
770         snapc->seq = le64_to_cpu(ondisk->snap_seq);
771         if (snap_count) {
772                 struct rbd_image_snap_ondisk *snaps;
773                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
774
775                 /* We'll keep a copy of the snapshot names... */
776
777                 if (snap_names_len > (u64)SIZE_MAX)
778                         goto out_2big;
779                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
780                 if (!snap_names)
781                         goto out_err;
782
783                 /* ...as well as the array of their sizes. */
784
785                 size = snap_count * sizeof (*header->snap_sizes);
786                 snap_sizes = kmalloc(size, GFP_KERNEL);
787                 if (!snap_sizes)
788                         goto out_err;
789
790                 /*
791                  * Copy the names, and fill in each snapshot's id
792                  * and size.
793                  *
794                  * Note that rbd_dev_v1_header_info() guarantees the
795                  * ondisk buffer we're working with has
796                  * snap_names_len bytes beyond the end of the
797                  * snapshot id array, this memcpy() is safe.
798                  */
799                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
800                 snaps = ondisk->snaps;
801                 for (i = 0; i < snap_count; i++) {
802                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
803                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
804                 }
805         }
806
807         /* We won't fail any more, fill in the header */
808
809         down_write(&rbd_dev->header_rwsem);
810         if (first_time) {
811                 header->object_prefix = object_prefix;
812                 header->obj_order = ondisk->options.order;
813                 header->crypt_type = ondisk->options.crypt_type;
814                 header->comp_type = ondisk->options.comp_type;
815                 /* The rest aren't used for format 1 images */
816                 header->stripe_unit = 0;
817                 header->stripe_count = 0;
818                 header->features = 0;
819         } else {
820                 ceph_put_snap_context(header->snapc);
821                 kfree(header->snap_names);
822                 kfree(header->snap_sizes);
823         }
824
825         /* The remaining fields always get updated (when we refresh) */
826
827         header->image_size = le64_to_cpu(ondisk->image_size);
828         header->snapc = snapc;
829         header->snap_names = snap_names;
830         header->snap_sizes = snap_sizes;
831
832         /* Make sure mapping size is consistent with header info */
833
834         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
835                 if (rbd_dev->mapping.size != header->image_size)
836                         rbd_dev->mapping.size = header->image_size;
837
838         up_write(&rbd_dev->header_rwsem);
839
840         return 0;
841 out_2big:
842         ret = -EIO;
843 out_err:
844         kfree(snap_sizes);
845         kfree(snap_names);
846         ceph_put_snap_context(snapc);
847         kfree(object_prefix);
848
849         return ret;
850 }
851
852 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
853 {
854         const char *snap_name;
855
856         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
857
858         /* Skip over names until we find the one we are looking for */
859
860         snap_name = rbd_dev->header.snap_names;
861         while (which--)
862                 snap_name += strlen(snap_name) + 1;
863
864         return kstrdup(snap_name, GFP_KERNEL);
865 }
866
867 /*
868  * Snapshot id comparison function for use with qsort()/bsearch().
869  * Note that result is for snapshots in *descending* order.
870  */
871 static int snapid_compare_reverse(const void *s1, const void *s2)
872 {
873         u64 snap_id1 = *(u64 *)s1;
874         u64 snap_id2 = *(u64 *)s2;
875
876         if (snap_id1 < snap_id2)
877                 return 1;
878         return snap_id1 == snap_id2 ? 0 : -1;
879 }
880
881 /*
882  * Search a snapshot context to see if the given snapshot id is
883  * present.
884  *
885  * Returns the position of the snapshot id in the array if it's found,
886  * or BAD_SNAP_INDEX otherwise.
887  *
888  * Note: The snapshot array is in kept sorted (by the osd) in
889  * reverse order, highest snapshot id first.
890  */
891 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
892 {
893         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
894         u64 *found;
895
896         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
897                                 sizeof (snap_id), snapid_compare_reverse);
898
899         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
900 }
901
902 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
903                                         u64 snap_id)
904 {
905         u32 which;
906
907         which = rbd_dev_snap_index(rbd_dev, snap_id);
908         if (which == BAD_SNAP_INDEX)
909                 return NULL;
910
911         return _rbd_dev_v1_snap_name(rbd_dev, which);
912 }
913
914 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
915 {
916         if (snap_id == CEPH_NOSNAP)
917                 return RBD_SNAP_HEAD_NAME;
918
919         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920         if (rbd_dev->image_format == 1)
921                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
922
923         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
924 }
925
926 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
927                                 u64 *snap_size)
928 {
929         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
930         if (snap_id == CEPH_NOSNAP) {
931                 *snap_size = rbd_dev->header.image_size;
932         } else if (rbd_dev->image_format == 1) {
933                 u32 which;
934
935                 which = rbd_dev_snap_index(rbd_dev, snap_id);
936                 if (which == BAD_SNAP_INDEX)
937                         return -ENOENT;
938
939                 *snap_size = rbd_dev->header.snap_sizes[which];
940         } else {
941                 u64 size = 0;
942                 int ret;
943
944                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
945                 if (ret)
946                         return ret;
947
948                 *snap_size = size;
949         }
950         return 0;
951 }
952
953 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
954                         u64 *snap_features)
955 {
956         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
957         if (snap_id == CEPH_NOSNAP) {
958                 *snap_features = rbd_dev->header.features;
959         } else if (rbd_dev->image_format == 1) {
960                 *snap_features = 0;     /* No features for format 1 */
961         } else {
962                 u64 features = 0;
963                 int ret;
964
965                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
966                 if (ret)
967                         return ret;
968
969                 *snap_features = features;
970         }
971         return 0;
972 }
973
974 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
975 {
976         u64 snap_id = rbd_dev->spec->snap_id;
977         u64 size = 0;
978         u64 features = 0;
979         int ret;
980
981         ret = rbd_snap_size(rbd_dev, snap_id, &size);
982         if (ret)
983                 return ret;
984         ret = rbd_snap_features(rbd_dev, snap_id, &features);
985         if (ret)
986                 return ret;
987
988         rbd_dev->mapping.size = size;
989         rbd_dev->mapping.features = features;
990
991         return 0;
992 }
993
994 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
995 {
996         rbd_dev->mapping.size = 0;
997         rbd_dev->mapping.features = 0;
998 }
999
1000 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1001 {
1002         char *name;
1003         u64 segment;
1004         int ret;
1005
1006         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1007         if (!name)
1008                 return NULL;
1009         segment = offset >> rbd_dev->header.obj_order;
1010         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
1011                         rbd_dev->header.object_prefix, segment);
1012         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1013                 pr_err("error formatting segment name for #%llu (%d)\n",
1014                         segment, ret);
1015                 kfree(name);
1016                 name = NULL;
1017         }
1018
1019         return name;
1020 }
1021
1022 static void rbd_segment_name_free(const char *name)
1023 {
1024         /* The explicit cast here is needed to drop the const qualifier */
1025
1026         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1027 }
1028
1029 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1030 {
1031         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1032
1033         return offset & (segment_size - 1);
1034 }
1035
1036 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1037                                 u64 offset, u64 length)
1038 {
1039         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1040
1041         offset &= segment_size - 1;
1042
1043         rbd_assert(length <= U64_MAX - offset);
1044         if (offset + length > segment_size)
1045                 length = segment_size - offset;
1046
1047         return length;
1048 }
1049
1050 /*
1051  * returns the size of an object in the image
1052  */
1053 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1054 {
1055         return 1 << header->obj_order;
1056 }
1057
1058 /*
1059  * bio helpers
1060  */
1061
1062 static void bio_chain_put(struct bio *chain)
1063 {
1064         struct bio *tmp;
1065
1066         while (chain) {
1067                 tmp = chain;
1068                 chain = chain->bi_next;
1069                 bio_put(tmp);
1070         }
1071 }
1072
1073 /*
1074  * zeros a bio chain, starting at specific offset
1075  */
1076 static void zero_bio_chain(struct bio *chain, int start_ofs)
1077 {
1078         struct bio_vec *bv;
1079         unsigned long flags;
1080         void *buf;
1081         int i;
1082         int pos = 0;
1083
1084         while (chain) {
1085                 bio_for_each_segment(bv, chain, i) {
1086                         if (pos + bv->bv_len > start_ofs) {
1087                                 int remainder = max(start_ofs - pos, 0);
1088                                 buf = bvec_kmap_irq(bv, &flags);
1089                                 memset(buf + remainder, 0,
1090                                        bv->bv_len - remainder);
1091                                 bvec_kunmap_irq(buf, &flags);
1092                         }
1093                         pos += bv->bv_len;
1094                 }
1095
1096                 chain = chain->bi_next;
1097         }
1098 }
1099
1100 /*
1101  * similar to zero_bio_chain(), zeros data defined by a page array,
1102  * starting at the given byte offset from the start of the array and
1103  * continuing up to the given end offset.  The pages array is
1104  * assumed to be big enough to hold all bytes up to the end.
1105  */
1106 static void zero_pages(struct page **pages, u64 offset, u64 end)
1107 {
1108         struct page **page = &pages[offset >> PAGE_SHIFT];
1109
1110         rbd_assert(end > offset);
1111         rbd_assert(end - offset <= (u64)SIZE_MAX);
1112         while (offset < end) {
1113                 size_t page_offset;
1114                 size_t length;
1115                 unsigned long flags;
1116                 void *kaddr;
1117
1118                 page_offset = (size_t)(offset & ~PAGE_MASK);
1119                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1120                 local_irq_save(flags);
1121                 kaddr = kmap_atomic(*page);
1122                 memset(kaddr + page_offset, 0, length);
1123                 kunmap_atomic(kaddr);
1124                 local_irq_restore(flags);
1125
1126                 offset += length;
1127                 page++;
1128         }
1129 }
1130
1131 /*
1132  * Clone a portion of a bio, starting at the given byte offset
1133  * and continuing for the number of bytes indicated.
1134  */
1135 static struct bio *bio_clone_range(struct bio *bio_src,
1136                                         unsigned int offset,
1137                                         unsigned int len,
1138                                         gfp_t gfpmask)
1139 {
1140         struct bio_vec *bv;
1141         unsigned int resid;
1142         unsigned short idx;
1143         unsigned int voff;
1144         unsigned short end_idx;
1145         unsigned short vcnt;
1146         struct bio *bio;
1147
1148         /* Handle the easy case for the caller */
1149
1150         if (!offset && len == bio_src->bi_size)
1151                 return bio_clone(bio_src, gfpmask);
1152
1153         if (WARN_ON_ONCE(!len))
1154                 return NULL;
1155         if (WARN_ON_ONCE(len > bio_src->bi_size))
1156                 return NULL;
1157         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1158                 return NULL;
1159
1160         /* Find first affected segment... */
1161
1162         resid = offset;
1163         __bio_for_each_segment(bv, bio_src, idx, 0) {
1164                 if (resid < bv->bv_len)
1165                         break;
1166                 resid -= bv->bv_len;
1167         }
1168         voff = resid;
1169
1170         /* ...and the last affected segment */
1171
1172         resid += len;
1173         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1174                 if (resid <= bv->bv_len)
1175                         break;
1176                 resid -= bv->bv_len;
1177         }
1178         vcnt = end_idx - idx + 1;
1179
1180         /* Build the clone */
1181
1182         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1183         if (!bio)
1184                 return NULL;    /* ENOMEM */
1185
1186         bio->bi_bdev = bio_src->bi_bdev;
1187         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1188         bio->bi_rw = bio_src->bi_rw;
1189         bio->bi_flags |= 1 << BIO_CLONED;
1190
1191         /*
1192          * Copy over our part of the bio_vec, then update the first
1193          * and last (or only) entries.
1194          */
1195         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1196                         vcnt * sizeof (struct bio_vec));
1197         bio->bi_io_vec[0].bv_offset += voff;
1198         if (vcnt > 1) {
1199                 bio->bi_io_vec[0].bv_len -= voff;
1200                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1201         } else {
1202                 bio->bi_io_vec[0].bv_len = len;
1203         }
1204
1205         bio->bi_vcnt = vcnt;
1206         bio->bi_size = len;
1207         bio->bi_idx = 0;
1208
1209         return bio;
1210 }
1211
1212 /*
1213  * Clone a portion of a bio chain, starting at the given byte offset
1214  * into the first bio in the source chain and continuing for the
1215  * number of bytes indicated.  The result is another bio chain of
1216  * exactly the given length, or a null pointer on error.
1217  *
1218  * The bio_src and offset parameters are both in-out.  On entry they
1219  * refer to the first source bio and the offset into that bio where
1220  * the start of data to be cloned is located.
1221  *
1222  * On return, bio_src is updated to refer to the bio in the source
1223  * chain that contains first un-cloned byte, and *offset will
1224  * contain the offset of that byte within that bio.
1225  */
1226 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1227                                         unsigned int *offset,
1228                                         unsigned int len,
1229                                         gfp_t gfpmask)
1230 {
1231         struct bio *bi = *bio_src;
1232         unsigned int off = *offset;
1233         struct bio *chain = NULL;
1234         struct bio **end;
1235
1236         /* Build up a chain of clone bios up to the limit */
1237
1238         if (!bi || off >= bi->bi_size || !len)
1239                 return NULL;            /* Nothing to clone */
1240
1241         end = &chain;
1242         while (len) {
1243                 unsigned int bi_size;
1244                 struct bio *bio;
1245
1246                 if (!bi) {
1247                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1248                         goto out_err;   /* EINVAL; ran out of bio's */
1249                 }
1250                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1251                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1252                 if (!bio)
1253                         goto out_err;   /* ENOMEM */
1254
1255                 *end = bio;
1256                 end = &bio->bi_next;
1257
1258                 off += bi_size;
1259                 if (off == bi->bi_size) {
1260                         bi = bi->bi_next;
1261                         off = 0;
1262                 }
1263                 len -= bi_size;
1264         }
1265         *bio_src = bi;
1266         *offset = off;
1267
1268         return chain;
1269 out_err:
1270         bio_chain_put(chain);
1271
1272         return NULL;
1273 }
1274
1275 /*
1276  * The default/initial value for all object request flags is 0.  For
1277  * each flag, once its value is set to 1 it is never reset to 0
1278  * again.
1279  */
1280 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1281 {
1282         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1283                 struct rbd_device *rbd_dev;
1284
1285                 rbd_dev = obj_request->img_request->rbd_dev;
1286                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1287                         obj_request);
1288         }
1289 }
1290
1291 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1292 {
1293         smp_mb();
1294         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1295 }
1296
1297 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1298 {
1299         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1300                 struct rbd_device *rbd_dev = NULL;
1301
1302                 if (obj_request_img_data_test(obj_request))
1303                         rbd_dev = obj_request->img_request->rbd_dev;
1304                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1305                         obj_request);
1306         }
1307 }
1308
1309 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1310 {
1311         smp_mb();
1312         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1313 }
1314
1315 /*
1316  * This sets the KNOWN flag after (possibly) setting the EXISTS
1317  * flag.  The latter is set based on the "exists" value provided.
1318  *
1319  * Note that for our purposes once an object exists it never goes
1320  * away again.  It's possible that the response from two existence
1321  * checks are separated by the creation of the target object, and
1322  * the first ("doesn't exist") response arrives *after* the second
1323  * ("does exist").  In that case we ignore the second one.
1324  */
1325 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1326                                 bool exists)
1327 {
1328         if (exists)
1329                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1330         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1331         smp_mb();
1332 }
1333
1334 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1335 {
1336         smp_mb();
1337         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1338 }
1339
1340 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1341 {
1342         smp_mb();
1343         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1344 }
1345
1346 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1347 {
1348         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1349                 atomic_read(&obj_request->kref.refcount));
1350         kref_get(&obj_request->kref);
1351 }
1352
1353 static void rbd_obj_request_destroy(struct kref *kref);
1354 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1355 {
1356         rbd_assert(obj_request != NULL);
1357         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1358                 atomic_read(&obj_request->kref.refcount));
1359         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1360 }
1361
1362 static void rbd_img_request_destroy(struct kref *kref);
1363 static void rbd_img_request_put(struct rbd_img_request *img_request)
1364 {
1365         rbd_assert(img_request != NULL);
1366         dout("%s: img %p (was %d)\n", __func__, img_request,
1367                 atomic_read(&img_request->kref.refcount));
1368         kref_put(&img_request->kref, rbd_img_request_destroy);
1369 }
1370
1371 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1372                                         struct rbd_obj_request *obj_request)
1373 {
1374         rbd_assert(obj_request->img_request == NULL);
1375
1376         /* Image request now owns object's original reference */
1377         obj_request->img_request = img_request;
1378         obj_request->which = img_request->obj_request_count;
1379         rbd_assert(!obj_request_img_data_test(obj_request));
1380         obj_request_img_data_set(obj_request);
1381         rbd_assert(obj_request->which != BAD_WHICH);
1382         img_request->obj_request_count++;
1383         list_add_tail(&obj_request->links, &img_request->obj_requests);
1384         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1385                 obj_request->which);
1386 }
1387
1388 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1389                                         struct rbd_obj_request *obj_request)
1390 {
1391         rbd_assert(obj_request->which != BAD_WHICH);
1392
1393         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1394                 obj_request->which);
1395         list_del(&obj_request->links);
1396         rbd_assert(img_request->obj_request_count > 0);
1397         img_request->obj_request_count--;
1398         rbd_assert(obj_request->which == img_request->obj_request_count);
1399         obj_request->which = BAD_WHICH;
1400         rbd_assert(obj_request_img_data_test(obj_request));
1401         rbd_assert(obj_request->img_request == img_request);
1402         obj_request->img_request = NULL;
1403         obj_request->callback = NULL;
1404         rbd_obj_request_put(obj_request);
1405 }
1406
1407 static bool obj_request_type_valid(enum obj_request_type type)
1408 {
1409         switch (type) {
1410         case OBJ_REQUEST_NODATA:
1411         case OBJ_REQUEST_BIO:
1412         case OBJ_REQUEST_PAGES:
1413                 return true;
1414         default:
1415                 return false;
1416         }
1417 }
1418
1419 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1420                                 struct rbd_obj_request *obj_request)
1421 {
1422         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1423
1424         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1425 }
1426
1427 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1428 {
1429
1430         dout("%s: img %p\n", __func__, img_request);
1431
1432         /*
1433          * If no error occurred, compute the aggregate transfer
1434          * count for the image request.  We could instead use
1435          * atomic64_cmpxchg() to update it as each object request
1436          * completes; not clear which way is better off hand.
1437          */
1438         if (!img_request->result) {
1439                 struct rbd_obj_request *obj_request;
1440                 u64 xferred = 0;
1441
1442                 for_each_obj_request(img_request, obj_request)
1443                         xferred += obj_request->xferred;
1444                 img_request->xferred = xferred;
1445         }
1446
1447         if (img_request->callback)
1448                 img_request->callback(img_request);
1449         else
1450                 rbd_img_request_put(img_request);
1451 }
1452
1453 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1454
1455 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1456 {
1457         dout("%s: obj %p\n", __func__, obj_request);
1458
1459         return wait_for_completion_interruptible(&obj_request->completion);
1460 }
1461
1462 /*
1463  * The default/initial value for all image request flags is 0.  Each
1464  * is conditionally set to 1 at image request initialization time
1465  * and currently never change thereafter.
1466  */
1467 static void img_request_write_set(struct rbd_img_request *img_request)
1468 {
1469         set_bit(IMG_REQ_WRITE, &img_request->flags);
1470         smp_mb();
1471 }
1472
1473 static bool img_request_write_test(struct rbd_img_request *img_request)
1474 {
1475         smp_mb();
1476         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1477 }
1478
1479 static void img_request_child_set(struct rbd_img_request *img_request)
1480 {
1481         set_bit(IMG_REQ_CHILD, &img_request->flags);
1482         smp_mb();
1483 }
1484
1485 static bool img_request_child_test(struct rbd_img_request *img_request)
1486 {
1487         smp_mb();
1488         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1489 }
1490
1491 static void img_request_layered_set(struct rbd_img_request *img_request)
1492 {
1493         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1494         smp_mb();
1495 }
1496
1497 static bool img_request_layered_test(struct rbd_img_request *img_request)
1498 {
1499         smp_mb();
1500         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1501 }
1502
1503 static void
1504 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1505 {
1506         u64 xferred = obj_request->xferred;
1507         u64 length = obj_request->length;
1508
1509         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1510                 obj_request, obj_request->img_request, obj_request->result,
1511                 xferred, length);
1512         /*
1513          * ENOENT means a hole in the image.  We zero-fill the
1514          * entire length of the request.  A short read also implies
1515          * zero-fill to the end of the request.  Either way we
1516          * update the xferred count to indicate the whole request
1517          * was satisfied.
1518          */
1519         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1520         if (obj_request->result == -ENOENT) {
1521                 if (obj_request->type == OBJ_REQUEST_BIO)
1522                         zero_bio_chain(obj_request->bio_list, 0);
1523                 else
1524                         zero_pages(obj_request->pages, 0, length);
1525                 obj_request->result = 0;
1526                 obj_request->xferred = length;
1527         } else if (xferred < length && !obj_request->result) {
1528                 if (obj_request->type == OBJ_REQUEST_BIO)
1529                         zero_bio_chain(obj_request->bio_list, xferred);
1530                 else
1531                         zero_pages(obj_request->pages, xferred, length);
1532                 obj_request->xferred = length;
1533         }
1534         obj_request_done_set(obj_request);
1535 }
1536
1537 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1538 {
1539         dout("%s: obj %p cb %p\n", __func__, obj_request,
1540                 obj_request->callback);
1541         if (obj_request->callback)
1542                 obj_request->callback(obj_request);
1543         else
1544                 complete_all(&obj_request->completion);
1545 }
1546
1547 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1548 {
1549         dout("%s: obj %p\n", __func__, obj_request);
1550         obj_request_done_set(obj_request);
1551 }
1552
1553 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1554 {
1555         struct rbd_img_request *img_request = NULL;
1556         struct rbd_device *rbd_dev = NULL;
1557         bool layered = false;
1558
1559         if (obj_request_img_data_test(obj_request)) {
1560                 img_request = obj_request->img_request;
1561                 layered = img_request && img_request_layered_test(img_request);
1562                 rbd_dev = img_request->rbd_dev;
1563         }
1564
1565         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1566                 obj_request, img_request, obj_request->result,
1567                 obj_request->xferred, obj_request->length);
1568         if (layered && obj_request->result == -ENOENT &&
1569                         obj_request->img_offset < rbd_dev->parent_overlap)
1570                 rbd_img_parent_read(obj_request);
1571         else if (img_request)
1572                 rbd_img_obj_request_read_callback(obj_request);
1573         else
1574                 obj_request_done_set(obj_request);
1575 }
1576
1577 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1578 {
1579         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1580                 obj_request->result, obj_request->length);
1581         /*
1582          * There is no such thing as a successful short write.  Set
1583          * it to our originally-requested length.
1584          */
1585         obj_request->xferred = obj_request->length;
1586         obj_request_done_set(obj_request);
1587 }
1588
1589 /*
1590  * For a simple stat call there's nothing to do.  We'll do more if
1591  * this is part of a write sequence for a layered image.
1592  */
1593 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1594 {
1595         dout("%s: obj %p\n", __func__, obj_request);
1596         obj_request_done_set(obj_request);
1597 }
1598
1599 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1600                                 struct ceph_msg *msg)
1601 {
1602         struct rbd_obj_request *obj_request = osd_req->r_priv;
1603         u16 opcode;
1604
1605         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1606         rbd_assert(osd_req == obj_request->osd_req);
1607         if (obj_request_img_data_test(obj_request)) {
1608                 rbd_assert(obj_request->img_request);
1609                 rbd_assert(obj_request->which != BAD_WHICH);
1610         } else {
1611                 rbd_assert(obj_request->which == BAD_WHICH);
1612         }
1613
1614         if (osd_req->r_result < 0)
1615                 obj_request->result = osd_req->r_result;
1616
1617         BUG_ON(osd_req->r_num_ops > 2);
1618
1619         /*
1620          * We support a 64-bit length, but ultimately it has to be
1621          * passed to blk_end_request(), which takes an unsigned int.
1622          */
1623         obj_request->xferred = osd_req->r_reply_op_len[0];
1624         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1625         opcode = osd_req->r_ops[0].op;
1626         switch (opcode) {
1627         case CEPH_OSD_OP_READ:
1628                 rbd_osd_read_callback(obj_request);
1629                 break;
1630         case CEPH_OSD_OP_WRITE:
1631                 rbd_osd_write_callback(obj_request);
1632                 break;
1633         case CEPH_OSD_OP_STAT:
1634                 rbd_osd_stat_callback(obj_request);
1635                 break;
1636         case CEPH_OSD_OP_CALL:
1637         case CEPH_OSD_OP_NOTIFY_ACK:
1638         case CEPH_OSD_OP_WATCH:
1639                 rbd_osd_trivial_callback(obj_request);
1640                 break;
1641         default:
1642                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1643                         obj_request->object_name, (unsigned short) opcode);
1644                 break;
1645         }
1646
1647         if (obj_request_done_test(obj_request))
1648                 rbd_obj_request_complete(obj_request);
1649 }
1650
1651 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1652 {
1653         struct rbd_img_request *img_request = obj_request->img_request;
1654         struct ceph_osd_request *osd_req = obj_request->osd_req;
1655         u64 snap_id;
1656
1657         rbd_assert(osd_req != NULL);
1658
1659         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1660         ceph_osdc_build_request(osd_req, obj_request->offset,
1661                         NULL, snap_id, NULL);
1662 }
1663
1664 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1665 {
1666         struct rbd_img_request *img_request = obj_request->img_request;
1667         struct ceph_osd_request *osd_req = obj_request->osd_req;
1668         struct ceph_snap_context *snapc;
1669         struct timespec mtime = CURRENT_TIME;
1670
1671         rbd_assert(osd_req != NULL);
1672
1673         snapc = img_request ? img_request->snapc : NULL;
1674         ceph_osdc_build_request(osd_req, obj_request->offset,
1675                         snapc, CEPH_NOSNAP, &mtime);
1676 }
1677
1678 static struct ceph_osd_request *rbd_osd_req_create(
1679                                         struct rbd_device *rbd_dev,
1680                                         bool write_request,
1681                                         struct rbd_obj_request *obj_request)
1682 {
1683         struct ceph_snap_context *snapc = NULL;
1684         struct ceph_osd_client *osdc;
1685         struct ceph_osd_request *osd_req;
1686
1687         if (obj_request_img_data_test(obj_request)) {
1688                 struct rbd_img_request *img_request = obj_request->img_request;
1689
1690                 rbd_assert(write_request ==
1691                                 img_request_write_test(img_request));
1692                 if (write_request)
1693                         snapc = img_request->snapc;
1694         }
1695
1696         /* Allocate and initialize the request, for the single op */
1697
1698         osdc = &rbd_dev->rbd_client->client->osdc;
1699         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1700         if (!osd_req)
1701                 return NULL;    /* ENOMEM */
1702
1703         if (write_request)
1704                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1705         else
1706                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1707
1708         osd_req->r_callback = rbd_osd_req_callback;
1709         osd_req->r_priv = obj_request;
1710
1711         osd_req->r_oid_len = strlen(obj_request->object_name);
1712         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1713         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1714
1715         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1716
1717         return osd_req;
1718 }
1719
1720 /*
1721  * Create a copyup osd request based on the information in the
1722  * object request supplied.  A copyup request has two osd ops,
1723  * a copyup method call, and a "normal" write request.
1724  */
1725 static struct ceph_osd_request *
1726 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1727 {
1728         struct rbd_img_request *img_request;
1729         struct ceph_snap_context *snapc;
1730         struct rbd_device *rbd_dev;
1731         struct ceph_osd_client *osdc;
1732         struct ceph_osd_request *osd_req;
1733
1734         rbd_assert(obj_request_img_data_test(obj_request));
1735         img_request = obj_request->img_request;
1736         rbd_assert(img_request);
1737         rbd_assert(img_request_write_test(img_request));
1738
1739         /* Allocate and initialize the request, for the two ops */
1740
1741         snapc = img_request->snapc;
1742         rbd_dev = img_request->rbd_dev;
1743         osdc = &rbd_dev->rbd_client->client->osdc;
1744         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1745         if (!osd_req)
1746                 return NULL;    /* ENOMEM */
1747
1748         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1749         osd_req->r_callback = rbd_osd_req_callback;
1750         osd_req->r_priv = obj_request;
1751
1752         osd_req->r_oid_len = strlen(obj_request->object_name);
1753         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1754         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1755
1756         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1757
1758         return osd_req;
1759 }
1760
1761
1762 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1763 {
1764         ceph_osdc_put_request(osd_req);
1765 }
1766
1767 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1768
1769 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1770                                                 u64 offset, u64 length,
1771                                                 enum obj_request_type type)
1772 {
1773         struct rbd_obj_request *obj_request;
1774         size_t size;
1775         char *name;
1776
1777         rbd_assert(obj_request_type_valid(type));
1778
1779         size = strlen(object_name) + 1;
1780         name = kmalloc(size, GFP_KERNEL);
1781         if (!name)
1782                 return NULL;
1783
1784         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1785         if (!obj_request) {
1786                 kfree(name);
1787                 return NULL;
1788         }
1789
1790         obj_request->object_name = memcpy(name, object_name, size);
1791         obj_request->offset = offset;
1792         obj_request->length = length;
1793         obj_request->flags = 0;
1794         obj_request->which = BAD_WHICH;
1795         obj_request->type = type;
1796         INIT_LIST_HEAD(&obj_request->links);
1797         init_completion(&obj_request->completion);
1798         kref_init(&obj_request->kref);
1799
1800         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1801                 offset, length, (int)type, obj_request);
1802
1803         return obj_request;
1804 }
1805
1806 static void rbd_obj_request_destroy(struct kref *kref)
1807 {
1808         struct rbd_obj_request *obj_request;
1809
1810         obj_request = container_of(kref, struct rbd_obj_request, kref);
1811
1812         dout("%s: obj %p\n", __func__, obj_request);
1813
1814         rbd_assert(obj_request->img_request == NULL);
1815         rbd_assert(obj_request->which == BAD_WHICH);
1816
1817         if (obj_request->osd_req)
1818                 rbd_osd_req_destroy(obj_request->osd_req);
1819
1820         rbd_assert(obj_request_type_valid(obj_request->type));
1821         switch (obj_request->type) {
1822         case OBJ_REQUEST_NODATA:
1823                 break;          /* Nothing to do */
1824         case OBJ_REQUEST_BIO:
1825                 if (obj_request->bio_list)
1826                         bio_chain_put(obj_request->bio_list);
1827                 break;
1828         case OBJ_REQUEST_PAGES:
1829                 if (obj_request->pages)
1830                         ceph_release_page_vector(obj_request->pages,
1831                                                 obj_request->page_count);
1832                 break;
1833         }
1834
1835         kfree(obj_request->object_name);
1836         obj_request->object_name = NULL;
1837         kmem_cache_free(rbd_obj_request_cache, obj_request);
1838 }
1839
1840 /*
1841  * Caller is responsible for filling in the list of object requests
1842  * that comprises the image request, and the Linux request pointer
1843  * (if there is one).
1844  */
1845 static struct rbd_img_request *rbd_img_request_create(
1846                                         struct rbd_device *rbd_dev,
1847                                         u64 offset, u64 length,
1848                                         bool write_request,
1849                                         bool child_request)
1850 {
1851         struct rbd_img_request *img_request;
1852
1853         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1854         if (!img_request)
1855                 return NULL;
1856
1857         if (write_request) {
1858                 down_read(&rbd_dev->header_rwsem);
1859                 ceph_get_snap_context(rbd_dev->header.snapc);
1860                 up_read(&rbd_dev->header_rwsem);
1861         }
1862
1863         img_request->rq = NULL;
1864         img_request->rbd_dev = rbd_dev;
1865         img_request->offset = offset;
1866         img_request->length = length;
1867         img_request->flags = 0;
1868         if (write_request) {
1869                 img_request_write_set(img_request);
1870                 img_request->snapc = rbd_dev->header.snapc;
1871         } else {
1872                 img_request->snap_id = rbd_dev->spec->snap_id;
1873         }
1874         if (child_request)
1875                 img_request_child_set(img_request);
1876         if (rbd_dev->parent_overlap)
1877                 img_request_layered_set(img_request);
1878         spin_lock_init(&img_request->completion_lock);
1879         img_request->next_completion = 0;
1880         img_request->callback = NULL;
1881         img_request->result = 0;
1882         img_request->obj_request_count = 0;
1883         INIT_LIST_HEAD(&img_request->obj_requests);
1884         kref_init(&img_request->kref);
1885
1886         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1887                 write_request ? "write" : "read", offset, length,
1888                 img_request);
1889
1890         return img_request;
1891 }
1892
1893 static void rbd_img_request_destroy(struct kref *kref)
1894 {
1895         struct rbd_img_request *img_request;
1896         struct rbd_obj_request *obj_request;
1897         struct rbd_obj_request *next_obj_request;
1898
1899         img_request = container_of(kref, struct rbd_img_request, kref);
1900
1901         dout("%s: img %p\n", __func__, img_request);
1902
1903         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1904                 rbd_img_obj_request_del(img_request, obj_request);
1905         rbd_assert(img_request->obj_request_count == 0);
1906
1907         if (img_request_write_test(img_request))
1908                 ceph_put_snap_context(img_request->snapc);
1909
1910         if (img_request_child_test(img_request))
1911                 rbd_obj_request_put(img_request->obj_request);
1912
1913         kmem_cache_free(rbd_img_request_cache, img_request);
1914 }
1915
1916 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1917 {
1918         struct rbd_img_request *img_request;
1919         unsigned int xferred;
1920         int result;
1921         bool more;
1922
1923         rbd_assert(obj_request_img_data_test(obj_request));
1924         img_request = obj_request->img_request;
1925
1926         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1927         xferred = (unsigned int)obj_request->xferred;
1928         result = obj_request->result;
1929         if (result) {
1930                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1931
1932                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1933                         img_request_write_test(img_request) ? "write" : "read",
1934                         obj_request->length, obj_request->img_offset,
1935                         obj_request->offset);
1936                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1937                         result, xferred);
1938                 if (!img_request->result)
1939                         img_request->result = result;
1940         }
1941
1942         /* Image object requests don't own their page array */
1943
1944         if (obj_request->type == OBJ_REQUEST_PAGES) {
1945                 obj_request->pages = NULL;
1946                 obj_request->page_count = 0;
1947         }
1948
1949         if (img_request_child_test(img_request)) {
1950                 rbd_assert(img_request->obj_request != NULL);
1951                 more = obj_request->which < img_request->obj_request_count - 1;
1952         } else {
1953                 rbd_assert(img_request->rq != NULL);
1954                 more = blk_end_request(img_request->rq, result, xferred);
1955         }
1956
1957         return more;
1958 }
1959
1960 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1961 {
1962         struct rbd_img_request *img_request;
1963         u32 which = obj_request->which;
1964         bool more = true;
1965
1966         rbd_assert(obj_request_img_data_test(obj_request));
1967         img_request = obj_request->img_request;
1968
1969         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1970         rbd_assert(img_request != NULL);
1971         rbd_assert(img_request->obj_request_count > 0);
1972         rbd_assert(which != BAD_WHICH);
1973         rbd_assert(which < img_request->obj_request_count);
1974         rbd_assert(which >= img_request->next_completion);
1975
1976         spin_lock_irq(&img_request->completion_lock);
1977         if (which != img_request->next_completion)
1978                 goto out;
1979
1980         for_each_obj_request_from(img_request, obj_request) {
1981                 rbd_assert(more);
1982                 rbd_assert(which < img_request->obj_request_count);
1983
1984                 if (!obj_request_done_test(obj_request))
1985                         break;
1986                 more = rbd_img_obj_end_request(obj_request);
1987                 which++;
1988         }
1989
1990         rbd_assert(more ^ (which == img_request->obj_request_count));
1991         img_request->next_completion = which;
1992 out:
1993         spin_unlock_irq(&img_request->completion_lock);
1994
1995         if (!more)
1996                 rbd_img_request_complete(img_request);
1997 }
1998
1999 /*
2000  * Split up an image request into one or more object requests, each
2001  * to a different object.  The "type" parameter indicates whether
2002  * "data_desc" is the pointer to the head of a list of bio
2003  * structures, or the base of a page array.  In either case this
2004  * function assumes data_desc describes memory sufficient to hold
2005  * all data described by the image request.
2006  */
2007 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2008                                         enum obj_request_type type,
2009                                         void *data_desc)
2010 {
2011         struct rbd_device *rbd_dev = img_request->rbd_dev;
2012         struct rbd_obj_request *obj_request = NULL;
2013         struct rbd_obj_request *next_obj_request;
2014         bool write_request = img_request_write_test(img_request);
2015         struct bio *bio_list;
2016         unsigned int bio_offset = 0;
2017         struct page **pages;
2018         u64 img_offset;
2019         u64 resid;
2020         u16 opcode;
2021
2022         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2023                 (int)type, data_desc);
2024
2025         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2026         img_offset = img_request->offset;
2027         resid = img_request->length;
2028         rbd_assert(resid > 0);
2029
2030         if (type == OBJ_REQUEST_BIO) {
2031                 bio_list = data_desc;
2032                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2033         } else {
2034                 rbd_assert(type == OBJ_REQUEST_PAGES);
2035                 pages = data_desc;
2036         }
2037
2038         while (resid) {
2039                 struct ceph_osd_request *osd_req;
2040                 const char *object_name;
2041                 u64 offset;
2042                 u64 length;
2043
2044                 object_name = rbd_segment_name(rbd_dev, img_offset);
2045                 if (!object_name)
2046                         goto out_unwind;
2047                 offset = rbd_segment_offset(rbd_dev, img_offset);
2048                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2049                 obj_request = rbd_obj_request_create(object_name,
2050                                                 offset, length, type);
2051                 /* object request has its own copy of the object name */
2052                 rbd_segment_name_free(object_name);
2053                 if (!obj_request)
2054                         goto out_unwind;
2055
2056                 if (type == OBJ_REQUEST_BIO) {
2057                         unsigned int clone_size;
2058
2059                         rbd_assert(length <= (u64)UINT_MAX);
2060                         clone_size = (unsigned int)length;
2061                         obj_request->bio_list =
2062                                         bio_chain_clone_range(&bio_list,
2063                                                                 &bio_offset,
2064                                                                 clone_size,
2065                                                                 GFP_ATOMIC);
2066                         if (!obj_request->bio_list)
2067                                 goto out_partial;
2068                 } else {
2069                         unsigned int page_count;
2070
2071                         obj_request->pages = pages;
2072                         page_count = (u32)calc_pages_for(offset, length);
2073                         obj_request->page_count = page_count;
2074                         if ((offset + length) & ~PAGE_MASK)
2075                                 page_count--;   /* more on last page */
2076                         pages += page_count;
2077                 }
2078
2079                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2080                                                 obj_request);
2081                 if (!osd_req)
2082                         goto out_partial;
2083                 obj_request->osd_req = osd_req;
2084                 obj_request->callback = rbd_img_obj_callback;
2085
2086                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2087                                                 0, 0);
2088                 if (type == OBJ_REQUEST_BIO)
2089                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2090                                         obj_request->bio_list, length);
2091                 else
2092                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2093                                         obj_request->pages, length,
2094                                         offset & ~PAGE_MASK, false, false);
2095
2096                 if (write_request)
2097                         rbd_osd_req_format_write(obj_request);
2098                 else
2099                         rbd_osd_req_format_read(obj_request);
2100
2101                 obj_request->img_offset = img_offset;
2102                 rbd_img_obj_request_add(img_request, obj_request);
2103
2104                 img_offset += length;
2105                 resid -= length;
2106         }
2107
2108         return 0;
2109
2110 out_partial:
2111         rbd_obj_request_put(obj_request);
2112 out_unwind:
2113         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2114                 rbd_obj_request_put(obj_request);
2115
2116         return -ENOMEM;
2117 }
2118
2119 static void
2120 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2121 {
2122         struct rbd_img_request *img_request;
2123         struct rbd_device *rbd_dev;
2124         struct page **pages;
2125         u32 page_count;
2126
2127         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128         rbd_assert(obj_request_img_data_test(obj_request));
2129         img_request = obj_request->img_request;
2130         rbd_assert(img_request);
2131
2132         rbd_dev = img_request->rbd_dev;
2133         rbd_assert(rbd_dev);
2134
2135         pages = obj_request->copyup_pages;
2136         rbd_assert(pages != NULL);
2137         obj_request->copyup_pages = NULL;
2138         page_count = obj_request->copyup_page_count;
2139         rbd_assert(page_count);
2140         obj_request->copyup_page_count = 0;
2141         ceph_release_page_vector(pages, page_count);
2142
2143         /*
2144          * We want the transfer count to reflect the size of the
2145          * original write request.  There is no such thing as a
2146          * successful short write, so if the request was successful
2147          * we can just set it to the originally-requested length.
2148          */
2149         if (!obj_request->result)
2150                 obj_request->xferred = obj_request->length;
2151
2152         /* Finish up with the normal image object callback */
2153
2154         rbd_img_obj_callback(obj_request);
2155 }
2156
2157 static void
2158 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2159 {
2160         struct rbd_obj_request *orig_request;
2161         struct ceph_osd_request *osd_req;
2162         struct ceph_osd_client *osdc;
2163         struct rbd_device *rbd_dev;
2164         struct page **pages;
2165         u32 page_count;
2166         int result;
2167         u64 parent_length;
2168         u64 offset;
2169         u64 length;
2170
2171         rbd_assert(img_request_child_test(img_request));
2172
2173         /* First get what we need from the image request */
2174
2175         pages = img_request->copyup_pages;
2176         rbd_assert(pages != NULL);
2177         img_request->copyup_pages = NULL;
2178         page_count = img_request->copyup_page_count;
2179         rbd_assert(page_count);
2180         img_request->copyup_page_count = 0;
2181
2182         orig_request = img_request->obj_request;
2183         rbd_assert(orig_request != NULL);
2184         rbd_assert(obj_request_type_valid(orig_request->type));
2185         result = img_request->result;
2186         parent_length = img_request->length;
2187         rbd_assert(parent_length == img_request->xferred);
2188         rbd_img_request_put(img_request);
2189
2190         rbd_assert(orig_request->img_request);
2191         rbd_dev = orig_request->img_request->rbd_dev;
2192         rbd_assert(rbd_dev);
2193
2194         if (result)
2195                 goto out_err;
2196
2197         /*
2198          * The original osd request is of no use to use any more.
2199          * We need a new one that can hold the two ops in a copyup
2200          * request.  Allocate the new copyup osd request for the
2201          * original request, and release the old one.
2202          */
2203         result = -ENOMEM;
2204         osd_req = rbd_osd_req_create_copyup(orig_request);
2205         if (!osd_req)
2206                 goto out_err;
2207         rbd_osd_req_destroy(orig_request->osd_req);
2208         orig_request->osd_req = osd_req;
2209         orig_request->copyup_pages = pages;
2210         orig_request->copyup_page_count = page_count;
2211
2212         /* Initialize the copyup op */
2213
2214         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2215         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2216                                                 false, false);
2217
2218         /* Then the original write request op */
2219
2220         offset = orig_request->offset;
2221         length = orig_request->length;
2222         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2223                                         offset, length, 0, 0);
2224         if (orig_request->type == OBJ_REQUEST_BIO)
2225                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2226                                         orig_request->bio_list, length);
2227         else
2228                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2229                                         orig_request->pages, length,
2230                                         offset & ~PAGE_MASK, false, false);
2231
2232         rbd_osd_req_format_write(orig_request);
2233
2234         /* All set, send it off. */
2235
2236         orig_request->callback = rbd_img_obj_copyup_callback;
2237         osdc = &rbd_dev->rbd_client->client->osdc;
2238         result = rbd_obj_request_submit(osdc, orig_request);
2239         if (!result)
2240                 return;
2241 out_err:
2242         /* Record the error code and complete the request */
2243
2244         orig_request->result = result;
2245         orig_request->xferred = 0;
2246         obj_request_done_set(orig_request);
2247         rbd_obj_request_complete(orig_request);
2248 }
2249
2250 /*
2251  * Read from the parent image the range of data that covers the
2252  * entire target of the given object request.  This is used for
2253  * satisfying a layered image write request when the target of an
2254  * object request from the image request does not exist.
2255  *
2256  * A page array big enough to hold the returned data is allocated
2257  * and supplied to rbd_img_request_fill() as the "data descriptor."
2258  * When the read completes, this page array will be transferred to
2259  * the original object request for the copyup operation.
2260  *
2261  * If an error occurs, record it as the result of the original
2262  * object request and mark it done so it gets completed.
2263  */
2264 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2265 {
2266         struct rbd_img_request *img_request = NULL;
2267         struct rbd_img_request *parent_request = NULL;
2268         struct rbd_device *rbd_dev;
2269         u64 img_offset;
2270         u64 length;
2271         struct page **pages = NULL;
2272         u32 page_count;
2273         int result;
2274
2275         rbd_assert(obj_request_img_data_test(obj_request));
2276         rbd_assert(obj_request_type_valid(obj_request->type));
2277
2278         img_request = obj_request->img_request;
2279         rbd_assert(img_request != NULL);
2280         rbd_dev = img_request->rbd_dev;
2281         rbd_assert(rbd_dev->parent != NULL);
2282
2283         /*
2284          * Determine the byte range covered by the object in the
2285          * child image to which the original request was to be sent.
2286          */
2287         img_offset = obj_request->img_offset - obj_request->offset;
2288         length = (u64)1 << rbd_dev->header.obj_order;
2289
2290         /*
2291          * There is no defined parent data beyond the parent
2292          * overlap, so limit what we read at that boundary if
2293          * necessary.
2294          */
2295         if (img_offset + length > rbd_dev->parent_overlap) {
2296                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2297                 length = rbd_dev->parent_overlap - img_offset;
2298         }
2299
2300         /*
2301          * Allocate a page array big enough to receive the data read
2302          * from the parent.
2303          */
2304         page_count = (u32)calc_pages_for(0, length);
2305         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2306         if (IS_ERR(pages)) {
2307                 result = PTR_ERR(pages);
2308                 pages = NULL;
2309                 goto out_err;
2310         }
2311
2312         result = -ENOMEM;
2313         parent_request = rbd_img_request_create(rbd_dev->parent,
2314                                                 img_offset, length,
2315                                                 false, true);
2316         if (!parent_request)
2317                 goto out_err;
2318         rbd_obj_request_get(obj_request);
2319         parent_request->obj_request = obj_request;
2320
2321         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2322         if (result)
2323                 goto out_err;
2324         parent_request->copyup_pages = pages;
2325         parent_request->copyup_page_count = page_count;
2326
2327         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2328         result = rbd_img_request_submit(parent_request);
2329         if (!result)
2330                 return 0;
2331
2332         parent_request->copyup_pages = NULL;
2333         parent_request->copyup_page_count = 0;
2334         parent_request->obj_request = NULL;
2335         rbd_obj_request_put(obj_request);
2336 out_err:
2337         if (pages)
2338                 ceph_release_page_vector(pages, page_count);
2339         if (parent_request)
2340                 rbd_img_request_put(parent_request);
2341         obj_request->result = result;
2342         obj_request->xferred = 0;
2343         obj_request_done_set(obj_request);
2344
2345         return result;
2346 }
2347
2348 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2349 {
2350         struct rbd_obj_request *orig_request;
2351         int result;
2352
2353         rbd_assert(!obj_request_img_data_test(obj_request));
2354
2355         /*
2356          * All we need from the object request is the original
2357          * request and the result of the STAT op.  Grab those, then
2358          * we're done with the request.
2359          */
2360         orig_request = obj_request->obj_request;
2361         obj_request->obj_request = NULL;
2362         rbd_assert(orig_request);
2363         rbd_assert(orig_request->img_request);
2364
2365         result = obj_request->result;
2366         obj_request->result = 0;
2367
2368         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2369                 obj_request, orig_request, result,
2370                 obj_request->xferred, obj_request->length);
2371         rbd_obj_request_put(obj_request);
2372
2373         rbd_assert(orig_request);
2374         rbd_assert(orig_request->img_request);
2375
2376         /*
2377          * Our only purpose here is to determine whether the object
2378          * exists, and we don't want to treat the non-existence as
2379          * an error.  If something else comes back, transfer the
2380          * error to the original request and complete it now.
2381          */
2382         if (!result) {
2383                 obj_request_existence_set(orig_request, true);
2384         } else if (result == -ENOENT) {
2385                 obj_request_existence_set(orig_request, false);
2386         } else if (result) {
2387                 orig_request->result = result;
2388                 goto out;
2389         }
2390
2391         /*
2392          * Resubmit the original request now that we have recorded
2393          * whether the target object exists.
2394          */
2395         orig_request->result = rbd_img_obj_request_submit(orig_request);
2396 out:
2397         if (orig_request->result)
2398                 rbd_obj_request_complete(orig_request);
2399         rbd_obj_request_put(orig_request);
2400 }
2401
2402 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2403 {
2404         struct rbd_obj_request *stat_request;
2405         struct rbd_device *rbd_dev;
2406         struct ceph_osd_client *osdc;
2407         struct page **pages = NULL;
2408         u32 page_count;
2409         size_t size;
2410         int ret;
2411
2412         /*
2413          * The response data for a STAT call consists of:
2414          *     le64 length;
2415          *     struct {
2416          *         le32 tv_sec;
2417          *         le32 tv_nsec;
2418          *     } mtime;
2419          */
2420         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2421         page_count = (u32)calc_pages_for(0, size);
2422         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2423         if (IS_ERR(pages))
2424                 return PTR_ERR(pages);
2425
2426         ret = -ENOMEM;
2427         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2428                                                         OBJ_REQUEST_PAGES);
2429         if (!stat_request)
2430                 goto out;
2431
2432         rbd_obj_request_get(obj_request);
2433         stat_request->obj_request = obj_request;
2434         stat_request->pages = pages;
2435         stat_request->page_count = page_count;
2436
2437         rbd_assert(obj_request->img_request);
2438         rbd_dev = obj_request->img_request->rbd_dev;
2439         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2440                                                 stat_request);
2441         if (!stat_request->osd_req)
2442                 goto out;
2443         stat_request->callback = rbd_img_obj_exists_callback;
2444
2445         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2446         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2447                                         false, false);
2448         rbd_osd_req_format_read(stat_request);
2449
2450         osdc = &rbd_dev->rbd_client->client->osdc;
2451         ret = rbd_obj_request_submit(osdc, stat_request);
2452 out:
2453         if (ret)
2454                 rbd_obj_request_put(obj_request);
2455
2456         return ret;
2457 }
2458
2459 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2460 {
2461         struct rbd_img_request *img_request;
2462         struct rbd_device *rbd_dev;
2463         bool known;
2464
2465         rbd_assert(obj_request_img_data_test(obj_request));
2466
2467         img_request = obj_request->img_request;
2468         rbd_assert(img_request);
2469         rbd_dev = img_request->rbd_dev;
2470
2471         /*
2472          * Only writes to layered images need special handling.
2473          * Reads and non-layered writes are simple object requests.
2474          * Layered writes that start beyond the end of the overlap
2475          * with the parent have no parent data, so they too are
2476          * simple object requests.  Finally, if the target object is
2477          * known to already exist, its parent data has already been
2478          * copied, so a write to the object can also be handled as a
2479          * simple object request.
2480          */
2481         if (!img_request_write_test(img_request) ||
2482                 !img_request_layered_test(img_request) ||
2483                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2484                 ((known = obj_request_known_test(obj_request)) &&
2485                         obj_request_exists_test(obj_request))) {
2486
2487                 struct rbd_device *rbd_dev;
2488                 struct ceph_osd_client *osdc;
2489
2490                 rbd_dev = obj_request->img_request->rbd_dev;
2491                 osdc = &rbd_dev->rbd_client->client->osdc;
2492
2493                 return rbd_obj_request_submit(osdc, obj_request);
2494         }
2495
2496         /*
2497          * It's a layered write.  The target object might exist but
2498          * we may not know that yet.  If we know it doesn't exist,
2499          * start by reading the data for the full target object from
2500          * the parent so we can use it for a copyup to the target.
2501          */
2502         if (known)
2503                 return rbd_img_obj_parent_read_full(obj_request);
2504
2505         /* We don't know whether the target exists.  Go find out. */
2506
2507         return rbd_img_obj_exists_submit(obj_request);
2508 }
2509
2510 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2511 {
2512         struct rbd_obj_request *obj_request;
2513         struct rbd_obj_request *next_obj_request;
2514
2515         dout("%s: img %p\n", __func__, img_request);
2516         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2517                 int ret;
2518
2519                 ret = rbd_img_obj_request_submit(obj_request);
2520                 if (ret)
2521                         return ret;
2522         }
2523
2524         return 0;
2525 }
2526
2527 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2528 {
2529         struct rbd_obj_request *obj_request;
2530         struct rbd_device *rbd_dev;
2531         u64 obj_end;
2532
2533         rbd_assert(img_request_child_test(img_request));
2534
2535         obj_request = img_request->obj_request;
2536         rbd_assert(obj_request);
2537         rbd_assert(obj_request->img_request);
2538
2539         obj_request->result = img_request->result;
2540         if (obj_request->result)
2541                 goto out;
2542
2543         /*
2544          * We need to zero anything beyond the parent overlap
2545          * boundary.  Since rbd_img_obj_request_read_callback()
2546          * will zero anything beyond the end of a short read, an
2547          * easy way to do this is to pretend the data from the
2548          * parent came up short--ending at the overlap boundary.
2549          */
2550         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2551         obj_end = obj_request->img_offset + obj_request->length;
2552         rbd_dev = obj_request->img_request->rbd_dev;
2553         if (obj_end > rbd_dev->parent_overlap) {
2554                 u64 xferred = 0;
2555
2556                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2557                         xferred = rbd_dev->parent_overlap -
2558                                         obj_request->img_offset;
2559
2560                 obj_request->xferred = min(img_request->xferred, xferred);
2561         } else {
2562                 obj_request->xferred = img_request->xferred;
2563         }
2564 out:
2565         rbd_img_request_put(img_request);
2566         rbd_img_obj_request_read_callback(obj_request);
2567         rbd_obj_request_complete(obj_request);
2568 }
2569
2570 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2571 {
2572         struct rbd_device *rbd_dev;
2573         struct rbd_img_request *img_request;
2574         int result;
2575
2576         rbd_assert(obj_request_img_data_test(obj_request));
2577         rbd_assert(obj_request->img_request != NULL);
2578         rbd_assert(obj_request->result == (s32) -ENOENT);
2579         rbd_assert(obj_request_type_valid(obj_request->type));
2580
2581         rbd_dev = obj_request->img_request->rbd_dev;
2582         rbd_assert(rbd_dev->parent != NULL);
2583         /* rbd_read_finish(obj_request, obj_request->length); */
2584         img_request = rbd_img_request_create(rbd_dev->parent,
2585                                                 obj_request->img_offset,
2586                                                 obj_request->length,
2587                                                 false, true);
2588         result = -ENOMEM;
2589         if (!img_request)
2590                 goto out_err;
2591
2592         rbd_obj_request_get(obj_request);
2593         img_request->obj_request = obj_request;
2594
2595         if (obj_request->type == OBJ_REQUEST_BIO)
2596                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2597                                                 obj_request->bio_list);
2598         else
2599                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2600                                                 obj_request->pages);
2601         if (result)
2602                 goto out_err;
2603
2604         img_request->callback = rbd_img_parent_read_callback;
2605         result = rbd_img_request_submit(img_request);
2606         if (result)
2607                 goto out_err;
2608
2609         return;
2610 out_err:
2611         if (img_request)
2612                 rbd_img_request_put(img_request);
2613         obj_request->result = result;
2614         obj_request->xferred = 0;
2615         obj_request_done_set(obj_request);
2616 }
2617
2618 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2619 {
2620         struct rbd_obj_request *obj_request;
2621         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2622         int ret;
2623
2624         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2625                                                         OBJ_REQUEST_NODATA);
2626         if (!obj_request)
2627                 return -ENOMEM;
2628
2629         ret = -ENOMEM;
2630         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2631         if (!obj_request->osd_req)
2632                 goto out;
2633         obj_request->callback = rbd_obj_request_put;
2634
2635         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2636                                         notify_id, 0, 0);
2637         rbd_osd_req_format_read(obj_request);
2638
2639         ret = rbd_obj_request_submit(osdc, obj_request);
2640 out:
2641         if (ret)
2642                 rbd_obj_request_put(obj_request);
2643
2644         return ret;
2645 }
2646
2647 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2648 {
2649         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2650         int ret;
2651
2652         if (!rbd_dev)
2653                 return;
2654
2655         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2656                 rbd_dev->header_name, (unsigned long long)notify_id,
2657                 (unsigned int)opcode);
2658         ret = rbd_dev_refresh(rbd_dev);
2659         if (ret)
2660                 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2661
2662         rbd_obj_notify_ack(rbd_dev, notify_id);
2663 }
2664
2665 /*
2666  * Request sync osd watch/unwatch.  The value of "start" determines
2667  * whether a watch request is being initiated or torn down.
2668  */
2669 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2670 {
2671         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2672         struct rbd_obj_request *obj_request;
2673         int ret;
2674
2675         rbd_assert(start ^ !!rbd_dev->watch_event);
2676         rbd_assert(start ^ !!rbd_dev->watch_request);
2677
2678         if (start) {
2679                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2680                                                 &rbd_dev->watch_event);
2681                 if (ret < 0)
2682                         return ret;
2683                 rbd_assert(rbd_dev->watch_event != NULL);
2684         }
2685
2686         ret = -ENOMEM;
2687         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2688                                                         OBJ_REQUEST_NODATA);
2689         if (!obj_request)
2690                 goto out_cancel;
2691
2692         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2693         if (!obj_request->osd_req)
2694                 goto out_cancel;
2695
2696         if (start)
2697                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2698         else
2699                 ceph_osdc_unregister_linger_request(osdc,
2700                                         rbd_dev->watch_request->osd_req);
2701
2702         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2703                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2704         rbd_osd_req_format_write(obj_request);
2705
2706         ret = rbd_obj_request_submit(osdc, obj_request);
2707         if (ret)
2708                 goto out_cancel;
2709         ret = rbd_obj_request_wait(obj_request);
2710         if (ret)
2711                 goto out_cancel;
2712         ret = obj_request->result;
2713         if (ret)
2714                 goto out_cancel;
2715
2716         /*
2717          * A watch request is set to linger, so the underlying osd
2718          * request won't go away until we unregister it.  We retain
2719          * a pointer to the object request during that time (in
2720          * rbd_dev->watch_request), so we'll keep a reference to
2721          * it.  We'll drop that reference (below) after we've
2722          * unregistered it.
2723          */
2724         if (start) {
2725                 rbd_dev->watch_request = obj_request;
2726
2727                 return 0;
2728         }
2729
2730         /* We have successfully torn down the watch request */
2731
2732         rbd_obj_request_put(rbd_dev->watch_request);
2733         rbd_dev->watch_request = NULL;
2734 out_cancel:
2735         /* Cancel the event if we're tearing down, or on error */
2736         ceph_osdc_cancel_event(rbd_dev->watch_event);
2737         rbd_dev->watch_event = NULL;
2738         if (obj_request)
2739                 rbd_obj_request_put(obj_request);
2740
2741         return ret;
2742 }
2743
2744 /*
2745  * Synchronous osd object method call.  Returns the number of bytes
2746  * returned in the outbound buffer, or a negative error code.
2747  */
2748 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2749                              const char *object_name,
2750                              const char *class_name,
2751                              const char *method_name,
2752                              const void *outbound,
2753                              size_t outbound_size,
2754                              void *inbound,
2755                              size_t inbound_size)
2756 {
2757         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2758         struct rbd_obj_request *obj_request;
2759         struct page **pages;
2760         u32 page_count;
2761         int ret;
2762
2763         /*
2764          * Method calls are ultimately read operations.  The result
2765          * should placed into the inbound buffer provided.  They
2766          * also supply outbound data--parameters for the object
2767          * method.  Currently if this is present it will be a
2768          * snapshot id.
2769          */
2770         page_count = (u32)calc_pages_for(0, inbound_size);
2771         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2772         if (IS_ERR(pages))
2773                 return PTR_ERR(pages);
2774
2775         ret = -ENOMEM;
2776         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2777                                                         OBJ_REQUEST_PAGES);
2778         if (!obj_request)
2779                 goto out;
2780
2781         obj_request->pages = pages;
2782         obj_request->page_count = page_count;
2783
2784         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2785         if (!obj_request->osd_req)
2786                 goto out;
2787
2788         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2789                                         class_name, method_name);
2790         if (outbound_size) {
2791                 struct ceph_pagelist *pagelist;
2792
2793                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2794                 if (!pagelist)
2795                         goto out;
2796
2797                 ceph_pagelist_init(pagelist);
2798                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2799                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2800                                                 pagelist);
2801         }
2802         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2803                                         obj_request->pages, inbound_size,
2804                                         0, false, false);
2805         rbd_osd_req_format_read(obj_request);
2806
2807         ret = rbd_obj_request_submit(osdc, obj_request);
2808         if (ret)
2809                 goto out;
2810         ret = rbd_obj_request_wait(obj_request);
2811         if (ret)
2812                 goto out;
2813
2814         ret = obj_request->result;
2815         if (ret < 0)
2816                 goto out;
2817
2818         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2819         ret = (int)obj_request->xferred;
2820         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2821 out:
2822         if (obj_request)
2823                 rbd_obj_request_put(obj_request);
2824         else
2825                 ceph_release_page_vector(pages, page_count);
2826
2827         return ret;
2828 }
2829
2830 static void rbd_request_fn(struct request_queue *q)
2831                 __releases(q->queue_lock) __acquires(q->queue_lock)
2832 {
2833         struct rbd_device *rbd_dev = q->queuedata;
2834         bool read_only = rbd_dev->mapping.read_only;
2835         struct request *rq;
2836         int result;
2837
2838         while ((rq = blk_fetch_request(q))) {
2839                 bool write_request = rq_data_dir(rq) == WRITE;
2840                 struct rbd_img_request *img_request;
2841                 u64 offset;
2842                 u64 length;
2843
2844                 /* Ignore any non-FS requests that filter through. */
2845
2846                 if (rq->cmd_type != REQ_TYPE_FS) {
2847                         dout("%s: non-fs request type %d\n", __func__,
2848                                 (int) rq->cmd_type);
2849                         __blk_end_request_all(rq, 0);
2850                         continue;
2851                 }
2852
2853                 /* Ignore/skip any zero-length requests */
2854
2855                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2856                 length = (u64) blk_rq_bytes(rq);
2857
2858                 if (!length) {
2859                         dout("%s: zero-length request\n", __func__);
2860                         __blk_end_request_all(rq, 0);
2861                         continue;
2862                 }
2863
2864                 spin_unlock_irq(q->queue_lock);
2865
2866                 /* Disallow writes to a read-only device */
2867
2868                 if (write_request) {
2869                         result = -EROFS;
2870                         if (read_only)
2871                                 goto end_request;
2872                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2873                 }
2874
2875                 /*
2876                  * Quit early if the mapped snapshot no longer
2877                  * exists.  It's still possible the snapshot will
2878                  * have disappeared by the time our request arrives
2879                  * at the osd, but there's no sense in sending it if
2880                  * we already know.
2881                  */
2882                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2883                         dout("request for non-existent snapshot");
2884                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2885                         result = -ENXIO;
2886                         goto end_request;
2887                 }
2888
2889                 result = -EINVAL;
2890                 if (offset && length > U64_MAX - offset + 1) {
2891                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2892                                 offset, length);
2893                         goto end_request;       /* Shouldn't happen */
2894                 }
2895
2896                 result = -EIO;
2897                 if (offset + length > rbd_dev->mapping.size) {
2898                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2899                                 offset, length, rbd_dev->mapping.size);
2900                         goto end_request;
2901                 }
2902
2903                 result = -ENOMEM;
2904                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2905                                                         write_request, false);
2906                 if (!img_request)
2907                         goto end_request;
2908
2909                 img_request->rq = rq;
2910
2911                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2912                                                 rq->bio);
2913                 if (!result)
2914                         result = rbd_img_request_submit(img_request);
2915                 if (result)
2916                         rbd_img_request_put(img_request);
2917 end_request:
2918                 spin_lock_irq(q->queue_lock);
2919                 if (result < 0) {
2920                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2921                                 write_request ? "write" : "read",
2922                                 length, offset, result);
2923
2924                         __blk_end_request_all(rq, result);
2925                 }
2926         }
2927 }
2928
2929 /*
2930  * a queue callback. Makes sure that we don't create a bio that spans across
2931  * multiple osd objects. One exception would be with a single page bios,
2932  * which we handle later at bio_chain_clone_range()
2933  */
2934 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2935                           struct bio_vec *bvec)
2936 {
2937         struct rbd_device *rbd_dev = q->queuedata;
2938         sector_t sector_offset;
2939         sector_t sectors_per_obj;
2940         sector_t obj_sector_offset;
2941         int ret;
2942
2943         /*
2944          * Find how far into its rbd object the partition-relative
2945          * bio start sector is to offset relative to the enclosing
2946          * device.
2947          */
2948         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2949         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2950         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2951
2952         /*
2953          * Compute the number of bytes from that offset to the end
2954          * of the object.  Account for what's already used by the bio.
2955          */
2956         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2957         if (ret > bmd->bi_size)
2958                 ret -= bmd->bi_size;
2959         else
2960                 ret = 0;
2961
2962         /*
2963          * Don't send back more than was asked for.  And if the bio
2964          * was empty, let the whole thing through because:  "Note
2965          * that a block device *must* allow a single page to be
2966          * added to an empty bio."
2967          */
2968         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2969         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2970                 ret = (int) bvec->bv_len;
2971
2972         return ret;
2973 }
2974
2975 static void rbd_free_disk(struct rbd_device *rbd_dev)
2976 {
2977         struct gendisk *disk = rbd_dev->disk;
2978
2979         if (!disk)
2980                 return;
2981
2982         rbd_dev->disk = NULL;
2983         if (disk->flags & GENHD_FL_UP) {
2984                 del_gendisk(disk);
2985                 if (disk->queue)
2986                         blk_cleanup_queue(disk->queue);
2987         }
2988         put_disk(disk);
2989 }
2990
2991 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2992                                 const char *object_name,
2993                                 u64 offset, u64 length, void *buf)
2994
2995 {
2996         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2997         struct rbd_obj_request *obj_request;
2998         struct page **pages = NULL;
2999         u32 page_count;
3000         size_t size;
3001         int ret;
3002
3003         page_count = (u32) calc_pages_for(offset, length);
3004         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3005         if (IS_ERR(pages))
3006                 ret = PTR_ERR(pages);
3007
3008         ret = -ENOMEM;
3009         obj_request = rbd_obj_request_create(object_name, offset, length,
3010                                                         OBJ_REQUEST_PAGES);
3011         if (!obj_request)
3012                 goto out;
3013
3014         obj_request->pages = pages;
3015         obj_request->page_count = page_count;
3016
3017         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3018         if (!obj_request->osd_req)
3019                 goto out;
3020
3021         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3022                                         offset, length, 0, 0);
3023         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3024                                         obj_request->pages,
3025                                         obj_request->length,
3026                                         obj_request->offset & ~PAGE_MASK,
3027                                         false, false);
3028         rbd_osd_req_format_read(obj_request);
3029
3030         ret = rbd_obj_request_submit(osdc, obj_request);
3031         if (ret)
3032                 goto out;
3033         ret = rbd_obj_request_wait(obj_request);
3034         if (ret)
3035                 goto out;
3036
3037         ret = obj_request->result;
3038         if (ret < 0)
3039                 goto out;
3040
3041         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3042         size = (size_t) obj_request->xferred;
3043         ceph_copy_from_page_vector(pages, buf, 0, size);
3044         rbd_assert(size <= (size_t)INT_MAX);
3045         ret = (int)size;
3046 out:
3047         if (obj_request)
3048                 rbd_obj_request_put(obj_request);
3049         else
3050                 ceph_release_page_vector(pages, page_count);
3051
3052         return ret;
3053 }
3054
3055 /*
3056  * Read the complete header for the given rbd device.  On successful
3057  * return, the rbd_dev->header field will contain up-to-date
3058  * information about the image.
3059  */
3060 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3061 {
3062         struct rbd_image_header_ondisk *ondisk = NULL;
3063         u32 snap_count = 0;
3064         u64 names_size = 0;
3065         u32 want_count;
3066         int ret;
3067
3068         /*
3069          * The complete header will include an array of its 64-bit
3070          * snapshot ids, followed by the names of those snapshots as
3071          * a contiguous block of NUL-terminated strings.  Note that
3072          * the number of snapshots could change by the time we read
3073          * it in, in which case we re-read it.
3074          */
3075         do {
3076                 size_t size;
3077
3078                 kfree(ondisk);
3079
3080                 size = sizeof (*ondisk);
3081                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3082                 size += names_size;
3083                 ondisk = kmalloc(size, GFP_KERNEL);
3084                 if (!ondisk)
3085                         return -ENOMEM;
3086
3087                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3088                                        0, size, ondisk);
3089                 if (ret < 0)
3090                         goto out;
3091                 if ((size_t)ret < size) {
3092                         ret = -ENXIO;
3093                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3094                                 size, ret);
3095                         goto out;
3096                 }
3097                 if (!rbd_dev_ondisk_valid(ondisk)) {
3098                         ret = -ENXIO;
3099                         rbd_warn(rbd_dev, "invalid header");
3100                         goto out;
3101                 }
3102
3103                 names_size = le64_to_cpu(ondisk->snap_names_len);
3104                 want_count = snap_count;
3105                 snap_count = le32_to_cpu(ondisk->snap_count);
3106         } while (snap_count != want_count);
3107
3108         ret = rbd_header_from_disk(rbd_dev, ondisk);
3109 out:
3110         kfree(ondisk);
3111
3112         return ret;
3113 }
3114
3115 /*
3116  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3117  * has disappeared from the (just updated) snapshot context.
3118  */
3119 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3120 {
3121         u64 snap_id;
3122
3123         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3124                 return;
3125
3126         snap_id = rbd_dev->spec->snap_id;
3127         if (snap_id == CEPH_NOSNAP)
3128                 return;
3129
3130         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3131                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3132 }
3133
3134 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3135 {
3136         u64 mapping_size;
3137         int ret;
3138
3139         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3140         mapping_size = rbd_dev->mapping.size;
3141         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3142         if (rbd_dev->image_format == 1)
3143                 ret = rbd_dev_v1_header_info(rbd_dev);
3144         else
3145                 ret = rbd_dev_v2_header_info(rbd_dev);
3146
3147         /* If it's a mapped snapshot, validate its EXISTS flag */
3148
3149         rbd_exists_validate(rbd_dev);
3150         mutex_unlock(&ctl_mutex);
3151         if (mapping_size != rbd_dev->mapping.size) {
3152                 sector_t size;
3153
3154                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3155                 dout("setting size to %llu sectors", (unsigned long long)size);
3156                 set_capacity(rbd_dev->disk, size);
3157                 revalidate_disk(rbd_dev->disk);
3158         }
3159
3160         return ret;
3161 }
3162
3163 static int rbd_init_disk(struct rbd_device *rbd_dev)
3164 {
3165         struct gendisk *disk;
3166         struct request_queue *q;
3167         u64 segment_size;
3168
3169         /* create gendisk info */
3170         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3171         if (!disk)
3172                 return -ENOMEM;
3173
3174         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3175                  rbd_dev->dev_id);
3176         disk->major = rbd_dev->major;
3177         disk->first_minor = 0;
3178         disk->fops = &rbd_bd_ops;
3179         disk->private_data = rbd_dev;
3180
3181         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3182         if (!q)
3183                 goto out_disk;
3184
3185         /* We use the default size, but let's be explicit about it. */
3186         blk_queue_physical_block_size(q, SECTOR_SIZE);
3187
3188         /* set io sizes to object size */
3189         segment_size = rbd_obj_bytes(&rbd_dev->header);
3190         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3191         blk_queue_max_segment_size(q, segment_size);
3192         blk_queue_io_min(q, segment_size);
3193         blk_queue_io_opt(q, segment_size);
3194
3195         blk_queue_merge_bvec(q, rbd_merge_bvec);
3196         disk->queue = q;
3197
3198         q->queuedata = rbd_dev;
3199
3200         rbd_dev->disk = disk;
3201
3202         return 0;
3203 out_disk:
3204         put_disk(disk);
3205
3206         return -ENOMEM;
3207 }
3208
3209 /*
3210   sysfs
3211 */
3212
3213 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3214 {
3215         return container_of(dev, struct rbd_device, dev);
3216 }
3217
3218 static ssize_t rbd_size_show(struct device *dev,
3219                              struct device_attribute *attr, char *buf)
3220 {
3221         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3222
3223         return sprintf(buf, "%llu\n",
3224                 (unsigned long long)rbd_dev->mapping.size);
3225 }
3226
3227 /*
3228  * Note this shows the features for whatever's mapped, which is not
3229  * necessarily the base image.
3230  */
3231 static ssize_t rbd_features_show(struct device *dev,
3232                              struct device_attribute *attr, char *buf)
3233 {
3234         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3235
3236         return sprintf(buf, "0x%016llx\n",
3237                         (unsigned long long)rbd_dev->mapping.features);
3238 }
3239
3240 static ssize_t rbd_major_show(struct device *dev,
3241                               struct device_attribute *attr, char *buf)
3242 {
3243         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3244
3245         if (rbd_dev->major)
3246                 return sprintf(buf, "%d\n", rbd_dev->major);
3247
3248         return sprintf(buf, "(none)\n");
3249
3250 }
3251
3252 static ssize_t rbd_client_id_show(struct device *dev,
3253                                   struct device_attribute *attr, char *buf)
3254 {
3255         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3256
3257         return sprintf(buf, "client%lld\n",
3258                         ceph_client_id(rbd_dev->rbd_client->client));
3259 }
3260
3261 static ssize_t rbd_pool_show(struct device *dev,
3262                              struct device_attribute *attr, char *buf)
3263 {
3264         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3265
3266         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3267 }
3268
3269 static ssize_t rbd_pool_id_show(struct device *dev,
3270                              struct device_attribute *attr, char *buf)
3271 {
3272         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3273
3274         return sprintf(buf, "%llu\n",
3275                         (unsigned long long) rbd_dev->spec->pool_id);
3276 }
3277
3278 static ssize_t rbd_name_show(struct device *dev,
3279                              struct device_attribute *attr, char *buf)
3280 {
3281         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3282
3283         if (rbd_dev->spec->image_name)
3284                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3285
3286         return sprintf(buf, "(unknown)\n");
3287 }
3288
3289 static ssize_t rbd_image_id_show(struct device *dev,
3290                              struct device_attribute *attr, char *buf)
3291 {
3292         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3293
3294         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3295 }
3296
3297 /*
3298  * Shows the name of the currently-mapped snapshot (or
3299  * RBD_SNAP_HEAD_NAME for the base image).
3300  */
3301 static ssize_t rbd_snap_show(struct device *dev,
3302                              struct device_attribute *attr,
3303                              char *buf)
3304 {
3305         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3306
3307         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3308 }
3309
3310 /*
3311  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3312  * for the parent image.  If there is no parent, simply shows
3313  * "(no parent image)".
3314  */
3315 static ssize_t rbd_parent_show(struct device *dev,
3316                              struct device_attribute *attr,
3317                              char *buf)
3318 {
3319         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3320         struct rbd_spec *spec = rbd_dev->parent_spec;
3321         int count;
3322         char *bufp = buf;
3323
3324         if (!spec)
3325                 return sprintf(buf, "(no parent image)\n");
3326
3327         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3328                         (unsigned long long) spec->pool_id, spec->pool_name);
3329         if (count < 0)
3330                 return count;
3331         bufp += count;
3332
3333         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3334                         spec->image_name ? spec->image_name : "(unknown)");
3335         if (count < 0)
3336                 return count;
3337         bufp += count;
3338
3339         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3340                         (unsigned long long) spec->snap_id, spec->snap_name);
3341         if (count < 0)
3342                 return count;
3343         bufp += count;
3344
3345         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3346         if (count < 0)
3347                 return count;
3348         bufp += count;
3349
3350         return (ssize_t) (bufp - buf);
3351 }
3352
3353 static ssize_t rbd_image_refresh(struct device *dev,
3354                                  struct device_attribute *attr,
3355                                  const char *buf,
3356                                  size_t size)
3357 {
3358         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3359         int ret;
3360
3361         ret = rbd_dev_refresh(rbd_dev);
3362         if (ret)
3363                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3364
3365         return ret < 0 ? ret : size;
3366 }
3367
3368 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3369 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3370 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3371 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3372 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3373 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3374 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3375 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3376 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3377 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3378 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3379
3380 static struct attribute *rbd_attrs[] = {
3381         &dev_attr_size.attr,
3382         &dev_attr_features.attr,
3383         &dev_attr_major.attr,
3384         &dev_attr_client_id.attr,
3385         &dev_attr_pool.attr,
3386         &dev_attr_pool_id.attr,
3387         &dev_attr_name.attr,
3388         &dev_attr_image_id.attr,
3389         &dev_attr_current_snap.attr,
3390         &dev_attr_parent.attr,
3391         &dev_attr_refresh.attr,
3392         NULL
3393 };
3394
3395 static struct attribute_group rbd_attr_group = {
3396         .attrs = rbd_attrs,
3397 };
3398
3399 static const struct attribute_group *rbd_attr_groups[] = {
3400         &rbd_attr_group,
3401         NULL
3402 };
3403
3404 static void rbd_sysfs_dev_release(struct device *dev)
3405 {
3406 }
3407
3408 static struct device_type rbd_device_type = {
3409         .name           = "rbd",
3410         .groups         = rbd_attr_groups,
3411         .release        = rbd_sysfs_dev_release,
3412 };
3413
3414 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3415 {
3416         kref_get(&spec->kref);
3417
3418         return spec;
3419 }
3420
3421 static void rbd_spec_free(struct kref *kref);
3422 static void rbd_spec_put(struct rbd_spec *spec)
3423 {
3424         if (spec)
3425                 kref_put(&spec->kref, rbd_spec_free);
3426 }
3427
3428 static struct rbd_spec *rbd_spec_alloc(void)
3429 {
3430         struct rbd_spec *spec;
3431
3432         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3433         if (!spec)
3434                 return NULL;
3435         kref_init(&spec->kref);
3436
3437         return spec;
3438 }
3439
3440 static void rbd_spec_free(struct kref *kref)
3441 {
3442         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3443
3444         kfree(spec->pool_name);
3445         kfree(spec->image_id);
3446         kfree(spec->image_name);
3447         kfree(spec->snap_name);
3448         kfree(spec);
3449 }
3450
3451 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3452                                 struct rbd_spec *spec)
3453 {
3454         struct rbd_device *rbd_dev;
3455
3456         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3457         if (!rbd_dev)
3458                 return NULL;
3459
3460         spin_lock_init(&rbd_dev->lock);
3461         rbd_dev->flags = 0;
3462         INIT_LIST_HEAD(&rbd_dev->node);
3463         init_rwsem(&rbd_dev->header_rwsem);
3464
3465         rbd_dev->spec = spec;
3466         rbd_dev->rbd_client = rbdc;
3467
3468         /* Initialize the layout used for all rbd requests */
3469
3470         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3471         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3472         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3473         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3474
3475         return rbd_dev;
3476 }
3477
3478 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3479 {
3480         rbd_put_client(rbd_dev->rbd_client);
3481         rbd_spec_put(rbd_dev->spec);
3482         kfree(rbd_dev);
3483 }
3484
3485 /*
3486  * Get the size and object order for an image snapshot, or if
3487  * snap_id is CEPH_NOSNAP, gets this information for the base
3488  * image.
3489  */
3490 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3491                                 u8 *order, u64 *snap_size)
3492 {
3493         __le64 snapid = cpu_to_le64(snap_id);
3494         int ret;
3495         struct {
3496                 u8 order;
3497                 __le64 size;
3498         } __attribute__ ((packed)) size_buf = { 0 };
3499
3500         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3501                                 "rbd", "get_size",
3502                                 &snapid, sizeof (snapid),
3503                                 &size_buf, sizeof (size_buf));
3504         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3505         if (ret < 0)
3506                 return ret;
3507         if (ret < sizeof (size_buf))
3508                 return -ERANGE;
3509
3510         if (order)
3511                 *order = size_buf.order;
3512         *snap_size = le64_to_cpu(size_buf.size);
3513
3514         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3515                 (unsigned long long)snap_id, (unsigned int)*order,
3516                 (unsigned long long)*snap_size);
3517
3518         return 0;
3519 }
3520
3521 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3522 {
3523         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3524                                         &rbd_dev->header.obj_order,
3525                                         &rbd_dev->header.image_size);
3526 }
3527
3528 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3529 {
3530         void *reply_buf;
3531         int ret;
3532         void *p;
3533
3534         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3535         if (!reply_buf)
3536                 return -ENOMEM;
3537
3538         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3539                                 "rbd", "get_object_prefix", NULL, 0,
3540                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3541         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3542         if (ret < 0)
3543                 goto out;
3544
3545         p = reply_buf;
3546         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3547                                                 p + ret, NULL, GFP_NOIO);
3548         ret = 0;
3549
3550         if (IS_ERR(rbd_dev->header.object_prefix)) {
3551                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3552                 rbd_dev->header.object_prefix = NULL;
3553         } else {
3554                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3555         }
3556 out:
3557         kfree(reply_buf);
3558
3559         return ret;
3560 }
3561
3562 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3563                 u64 *snap_features)
3564 {
3565         __le64 snapid = cpu_to_le64(snap_id);
3566         struct {
3567                 __le64 features;
3568                 __le64 incompat;
3569         } __attribute__ ((packed)) features_buf = { 0 };
3570         u64 incompat;
3571         int ret;
3572
3573         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3574                                 "rbd", "get_features",
3575                                 &snapid, sizeof (snapid),
3576                                 &features_buf, sizeof (features_buf));
3577         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3578         if (ret < 0)
3579                 return ret;
3580         if (ret < sizeof (features_buf))
3581                 return -ERANGE;
3582
3583         incompat = le64_to_cpu(features_buf.incompat);
3584         if (incompat & ~RBD_FEATURES_SUPPORTED)
3585                 return -ENXIO;
3586
3587         *snap_features = le64_to_cpu(features_buf.features);
3588
3589         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3590                 (unsigned long long)snap_id,
3591                 (unsigned long long)*snap_features,
3592                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3593
3594         return 0;
3595 }
3596
3597 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3598 {
3599         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3600                                                 &rbd_dev->header.features);
3601 }
3602
3603 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3604 {
3605         struct rbd_spec *parent_spec;
3606         size_t size;
3607         void *reply_buf = NULL;
3608         __le64 snapid;
3609         void *p;
3610         void *end;
3611         u64 pool_id;
3612         char *image_id;
3613         u64 overlap;
3614         int ret;
3615
3616         parent_spec = rbd_spec_alloc();
3617         if (!parent_spec)
3618                 return -ENOMEM;
3619
3620         size = sizeof (__le64) +                                /* pool_id */
3621                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3622                 sizeof (__le64) +                               /* snap_id */
3623                 sizeof (__le64);                                /* overlap */
3624         reply_buf = kmalloc(size, GFP_KERNEL);
3625         if (!reply_buf) {
3626                 ret = -ENOMEM;
3627                 goto out_err;
3628         }
3629
3630         snapid = cpu_to_le64(CEPH_NOSNAP);
3631         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3632                                 "rbd", "get_parent",
3633                                 &snapid, sizeof (snapid),
3634                                 reply_buf, size);
3635         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3636         if (ret < 0)
3637                 goto out_err;
3638
3639         p = reply_buf;
3640         end = reply_buf + ret;
3641         ret = -ERANGE;
3642         ceph_decode_64_safe(&p, end, pool_id, out_err);
3643         if (pool_id == CEPH_NOPOOL)
3644                 goto out;       /* No parent?  No problem. */
3645
3646         /* The ceph file layout needs to fit pool id in 32 bits */
3647
3648         ret = -EIO;
3649         if (pool_id > (u64)U32_MAX) {
3650                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3651                         (unsigned long long)pool_id, U32_MAX);
3652                 goto out_err;
3653         }
3654         parent_spec->pool_id = pool_id;
3655
3656         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3657         if (IS_ERR(image_id)) {
3658                 ret = PTR_ERR(image_id);
3659                 goto out_err;
3660         }
3661         parent_spec->image_id = image_id;
3662         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3663         ceph_decode_64_safe(&p, end, overlap, out_err);
3664
3665         if (overlap) {
3666                 rbd_spec_put(rbd_dev->parent_spec);
3667                 rbd_dev->parent_spec = parent_spec;
3668                 parent_spec = NULL;     /* rbd_dev now owns this */
3669                 rbd_dev->parent_overlap = overlap;
3670         } else {
3671                 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3672         }
3673 out:
3674         ret = 0;
3675 out_err:
3676         kfree(reply_buf);
3677         rbd_spec_put(parent_spec);
3678
3679         return ret;
3680 }
3681
3682 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3683 {
3684         struct {
3685                 __le64 stripe_unit;
3686                 __le64 stripe_count;
3687         } __attribute__ ((packed)) striping_info_buf = { 0 };
3688         size_t size = sizeof (striping_info_buf);
3689         void *p;
3690         u64 obj_size;
3691         u64 stripe_unit;
3692         u64 stripe_count;
3693         int ret;
3694
3695         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3696                                 "rbd", "get_stripe_unit_count", NULL, 0,
3697                                 (char *)&striping_info_buf, size);
3698         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3699         if (ret < 0)
3700                 return ret;
3701         if (ret < size)
3702                 return -ERANGE;
3703
3704         /*
3705          * We don't actually support the "fancy striping" feature
3706          * (STRIPINGV2) yet, but if the striping sizes are the
3707          * defaults the behavior is the same as before.  So find
3708          * out, and only fail if the image has non-default values.
3709          */
3710         ret = -EINVAL;
3711         obj_size = (u64)1 << rbd_dev->header.obj_order;
3712         p = &striping_info_buf;
3713         stripe_unit = ceph_decode_64(&p);
3714         if (stripe_unit != obj_size) {
3715                 rbd_warn(rbd_dev, "unsupported stripe unit "
3716                                 "(got %llu want %llu)",
3717                                 stripe_unit, obj_size);
3718                 return -EINVAL;
3719         }
3720         stripe_count = ceph_decode_64(&p);
3721         if (stripe_count != 1) {
3722                 rbd_warn(rbd_dev, "unsupported stripe count "
3723                                 "(got %llu want 1)", stripe_count);
3724                 return -EINVAL;
3725         }
3726         rbd_dev->header.stripe_unit = stripe_unit;
3727         rbd_dev->header.stripe_count = stripe_count;
3728
3729         return 0;
3730 }
3731
3732 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3733 {
3734         size_t image_id_size;
3735         char *image_id;
3736         void *p;
3737         void *end;
3738         size_t size;
3739         void *reply_buf = NULL;
3740         size_t len = 0;
3741         char *image_name = NULL;
3742         int ret;
3743
3744         rbd_assert(!rbd_dev->spec->image_name);
3745
3746         len = strlen(rbd_dev->spec->image_id);
3747         image_id_size = sizeof (__le32) + len;
3748         image_id = kmalloc(image_id_size, GFP_KERNEL);
3749         if (!image_id)
3750                 return NULL;
3751
3752         p = image_id;
3753         end = image_id + image_id_size;
3754         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3755
3756         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3757         reply_buf = kmalloc(size, GFP_KERNEL);
3758         if (!reply_buf)
3759                 goto out;
3760
3761         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3762                                 "rbd", "dir_get_name",
3763                                 image_id, image_id_size,
3764                                 reply_buf, size);
3765         if (ret < 0)
3766                 goto out;
3767         p = reply_buf;
3768         end = reply_buf + ret;
3769
3770         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3771         if (IS_ERR(image_name))
3772                 image_name = NULL;
3773         else
3774                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3775 out:
3776         kfree(reply_buf);
3777         kfree(image_id);
3778
3779         return image_name;
3780 }
3781
3782 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3783 {
3784         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3785         const char *snap_name;
3786         u32 which = 0;
3787
3788         /* Skip over names until we find the one we are looking for */
3789
3790         snap_name = rbd_dev->header.snap_names;
3791         while (which < snapc->num_snaps) {
3792                 if (!strcmp(name, snap_name))
3793                         return snapc->snaps[which];
3794                 snap_name += strlen(snap_name) + 1;
3795                 which++;
3796         }
3797         return CEPH_NOSNAP;
3798 }
3799
3800 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3801 {
3802         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3803         u32 which;
3804         bool found = false;
3805         u64 snap_id;
3806
3807         for (which = 0; !found && which < snapc->num_snaps; which++) {
3808                 const char *snap_name;
3809
3810                 snap_id = snapc->snaps[which];
3811                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3812                 if (IS_ERR(snap_name))
3813                         break;
3814                 found = !strcmp(name, snap_name);
3815                 kfree(snap_name);
3816         }
3817         return found ? snap_id : CEPH_NOSNAP;
3818 }
3819
3820 /*
3821  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3822  * no snapshot by that name is found, or if an error occurs.
3823  */
3824 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3825 {
3826         if (rbd_dev->image_format == 1)
3827                 return rbd_v1_snap_id_by_name(rbd_dev, name);
3828
3829         return rbd_v2_snap_id_by_name(rbd_dev, name);
3830 }
3831
3832 /*
3833  * When an rbd image has a parent image, it is identified by the
3834  * pool, image, and snapshot ids (not names).  This function fills
3835  * in the names for those ids.  (It's OK if we can't figure out the
3836  * name for an image id, but the pool and snapshot ids should always
3837  * exist and have names.)  All names in an rbd spec are dynamically
3838  * allocated.
3839  *
3840  * When an image being mapped (not a parent) is probed, we have the
3841  * pool name and pool id, image name and image id, and the snapshot
3842  * name.  The only thing we're missing is the snapshot id.
3843  */
3844 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3845 {
3846         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3847         struct rbd_spec *spec = rbd_dev->spec;
3848         const char *pool_name;
3849         const char *image_name;
3850         const char *snap_name;
3851         int ret;
3852
3853         /*
3854          * An image being mapped will have the pool name (etc.), but
3855          * we need to look up the snapshot id.
3856          */
3857         if (spec->pool_name) {
3858                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3859                         u64 snap_id;
3860
3861                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3862                         if (snap_id == CEPH_NOSNAP)
3863                                 return -ENOENT;
3864                         spec->snap_id = snap_id;
3865                 } else {
3866                         spec->snap_id = CEPH_NOSNAP;
3867                 }
3868
3869                 return 0;
3870         }
3871
3872         /* Get the pool name; we have to make our own copy of this */
3873
3874         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3875         if (!pool_name) {
3876                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3877                 return -EIO;
3878         }
3879         pool_name = kstrdup(pool_name, GFP_KERNEL);
3880         if (!pool_name)
3881                 return -ENOMEM;
3882
3883         /* Fetch the image name; tolerate failure here */
3884
3885         image_name = rbd_dev_image_name(rbd_dev);
3886         if (!image_name)
3887                 rbd_warn(rbd_dev, "unable to get image name");
3888
3889         /* Look up the snapshot name, and make a copy */
3890
3891         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3892         if (!snap_name) {
3893                 ret = -ENOMEM;
3894                 goto out_err;
3895         }
3896
3897         spec->pool_name = pool_name;
3898         spec->image_name = image_name;
3899         spec->snap_name = snap_name;
3900
3901         return 0;
3902 out_err:
3903         kfree(image_name);
3904         kfree(pool_name);
3905
3906         return ret;
3907 }
3908
3909 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3910 {
3911         size_t size;
3912         int ret;
3913         void *reply_buf;
3914         void *p;
3915         void *end;
3916         u64 seq;
3917         u32 snap_count;
3918         struct ceph_snap_context *snapc;
3919         u32 i;
3920
3921         /*
3922          * We'll need room for the seq value (maximum snapshot id),
3923          * snapshot count, and array of that many snapshot ids.
3924          * For now we have a fixed upper limit on the number we're
3925          * prepared to receive.
3926          */
3927         size = sizeof (__le64) + sizeof (__le32) +
3928                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3929         reply_buf = kzalloc(size, GFP_KERNEL);
3930         if (!reply_buf)
3931                 return -ENOMEM;
3932
3933         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3934                                 "rbd", "get_snapcontext", NULL, 0,
3935                                 reply_buf, size);
3936         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3937         if (ret < 0)
3938                 goto out;
3939
3940         p = reply_buf;
3941         end = reply_buf + ret;
3942         ret = -ERANGE;
3943         ceph_decode_64_safe(&p, end, seq, out);
3944         ceph_decode_32_safe(&p, end, snap_count, out);
3945
3946         /*
3947          * Make sure the reported number of snapshot ids wouldn't go
3948          * beyond the end of our buffer.  But before checking that,
3949          * make sure the computed size of the snapshot context we
3950          * allocate is representable in a size_t.
3951          */
3952         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3953                                  / sizeof (u64)) {
3954                 ret = -EINVAL;
3955                 goto out;
3956         }
3957         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3958                 goto out;
3959         ret = 0;
3960
3961         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3962         if (!snapc) {
3963                 ret = -ENOMEM;
3964                 goto out;
3965         }
3966         snapc->seq = seq;
3967         for (i = 0; i < snap_count; i++)
3968                 snapc->snaps[i] = ceph_decode_64(&p);
3969
3970         ceph_put_snap_context(rbd_dev->header.snapc);
3971         rbd_dev->header.snapc = snapc;
3972
3973         dout("  snap context seq = %llu, snap_count = %u\n",
3974                 (unsigned long long)seq, (unsigned int)snap_count);
3975 out:
3976         kfree(reply_buf);
3977
3978         return ret;
3979 }
3980
3981 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3982                                         u64 snap_id)
3983 {
3984         size_t size;
3985         void *reply_buf;
3986         __le64 snapid;
3987         int ret;
3988         void *p;
3989         void *end;
3990         char *snap_name;
3991
3992         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3993         reply_buf = kmalloc(size, GFP_KERNEL);
3994         if (!reply_buf)
3995                 return ERR_PTR(-ENOMEM);
3996
3997         snapid = cpu_to_le64(snap_id);
3998         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3999                                 "rbd", "get_snapshot_name",
4000                                 &snapid, sizeof (snapid),
4001                                 reply_buf, size);
4002         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4003         if (ret < 0) {
4004                 snap_name = ERR_PTR(ret);
4005                 goto out;
4006         }
4007
4008         p = reply_buf;
4009         end = reply_buf + ret;
4010         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4011         if (IS_ERR(snap_name))
4012                 goto out;
4013
4014         dout("  snap_id 0x%016llx snap_name = %s\n",
4015                 (unsigned long long)snap_id, snap_name);
4016 out:
4017         kfree(reply_buf);
4018
4019         return snap_name;
4020 }
4021
4022 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4023 {
4024         bool first_time = rbd_dev->header.object_prefix == NULL;
4025         int ret;
4026
4027         down_write(&rbd_dev->header_rwsem);
4028
4029         if (first_time) {
4030                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4031                 if (ret)
4032                         goto out;
4033         }
4034
4035         /*
4036          * If the image supports layering, get the parent info.  We
4037          * need to probe the first time regardless.  Thereafter we
4038          * only need to if there's a parent, to see if it has
4039          * disappeared due to the mapped image getting flattened.
4040          */
4041         if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4042                         (first_time || rbd_dev->parent_spec)) {
4043                 bool warn;
4044
4045                 ret = rbd_dev_v2_parent_info(rbd_dev);
4046                 if (ret)
4047                         goto out;
4048
4049                 /*
4050                  * Print a warning if this is the initial probe and
4051                  * the image has a parent.  Don't print it if the
4052                  * image now being probed is itself a parent.  We
4053                  * can tell at this point because we won't know its
4054                  * pool name yet (just its pool id).
4055                  */
4056                 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4057                 if (first_time && warn)
4058                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4059                                         "is EXPERIMENTAL!");
4060         }
4061
4062         ret = rbd_dev_v2_image_size(rbd_dev);
4063         if (ret)
4064                 goto out;
4065
4066         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4067                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4068                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4069
4070         ret = rbd_dev_v2_snap_context(rbd_dev);
4071         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4072 out:
4073         up_write(&rbd_dev->header_rwsem);
4074
4075         return ret;
4076 }
4077
4078 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4079 {
4080         struct device *dev;
4081         int ret;
4082
4083         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4084
4085         dev = &rbd_dev->dev;
4086         dev->bus = &rbd_bus_type;
4087         dev->type = &rbd_device_type;
4088         dev->parent = &rbd_root_dev;
4089         dev->release = rbd_dev_device_release;
4090         dev_set_name(dev, "%d", rbd_dev->dev_id);
4091         ret = device_register(dev);
4092
4093         mutex_unlock(&ctl_mutex);
4094
4095         return ret;
4096 }
4097
4098 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4099 {
4100         device_unregister(&rbd_dev->dev);
4101 }
4102
4103 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4104
4105 /*
4106  * Get a unique rbd identifier for the given new rbd_dev, and add
4107  * the rbd_dev to the global list.  The minimum rbd id is 1.
4108  */
4109 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4110 {
4111         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4112
4113         spin_lock(&rbd_dev_list_lock);
4114         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4115         spin_unlock(&rbd_dev_list_lock);
4116         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4117                 (unsigned long long) rbd_dev->dev_id);
4118 }
4119
4120 /*
4121  * Remove an rbd_dev from the global list, and record that its
4122  * identifier is no longer in use.
4123  */
4124 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4125 {
4126         struct list_head *tmp;
4127         int rbd_id = rbd_dev->dev_id;
4128         int max_id;
4129
4130         rbd_assert(rbd_id > 0);
4131
4132         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4133                 (unsigned long long) rbd_dev->dev_id);
4134         spin_lock(&rbd_dev_list_lock);
4135         list_del_init(&rbd_dev->node);
4136
4137         /*
4138          * If the id being "put" is not the current maximum, there
4139          * is nothing special we need to do.
4140          */
4141         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4142                 spin_unlock(&rbd_dev_list_lock);
4143                 return;
4144         }
4145
4146         /*
4147          * We need to update the current maximum id.  Search the
4148          * list to find out what it is.  We're more likely to find
4149          * the maximum at the end, so search the list backward.
4150          */
4151         max_id = 0;
4152         list_for_each_prev(tmp, &rbd_dev_list) {
4153                 struct rbd_device *rbd_dev;
4154
4155                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4156                 if (rbd_dev->dev_id > max_id)
4157                         max_id = rbd_dev->dev_id;
4158         }
4159         spin_unlock(&rbd_dev_list_lock);
4160
4161         /*
4162          * The max id could have been updated by rbd_dev_id_get(), in
4163          * which case it now accurately reflects the new maximum.
4164          * Be careful not to overwrite the maximum value in that
4165          * case.
4166          */
4167         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4168         dout("  max dev id has been reset\n");
4169 }
4170
4171 /*
4172  * Skips over white space at *buf, and updates *buf to point to the
4173  * first found non-space character (if any). Returns the length of
4174  * the token (string of non-white space characters) found.  Note
4175  * that *buf must be terminated with '\0'.
4176  */
4177 static inline size_t next_token(const char **buf)
4178 {
4179         /*
4180         * These are the characters that produce nonzero for
4181         * isspace() in the "C" and "POSIX" locales.
4182         */
4183         const char *spaces = " \f\n\r\t\v";
4184
4185         *buf += strspn(*buf, spaces);   /* Find start of token */
4186
4187         return strcspn(*buf, spaces);   /* Return token length */
4188 }
4189
4190 /*
4191  * Finds the next token in *buf, and if the provided token buffer is
4192  * big enough, copies the found token into it.  The result, if
4193  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4194  * must be terminated with '\0' on entry.
4195  *
4196  * Returns the length of the token found (not including the '\0').
4197  * Return value will be 0 if no token is found, and it will be >=
4198  * token_size if the token would not fit.
4199  *
4200  * The *buf pointer will be updated to point beyond the end of the
4201  * found token.  Note that this occurs even if the token buffer is
4202  * too small to hold it.
4203  */
4204 static inline size_t copy_token(const char **buf,
4205                                 char *token,
4206                                 size_t token_size)
4207 {
4208         size_t len;
4209
4210         len = next_token(buf);
4211         if (len < token_size) {
4212                 memcpy(token, *buf, len);
4213                 *(token + len) = '\0';
4214         }
4215         *buf += len;
4216
4217         return len;
4218 }
4219
4220 /*
4221  * Finds the next token in *buf, dynamically allocates a buffer big
4222  * enough to hold a copy of it, and copies the token into the new
4223  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4224  * that a duplicate buffer is created even for a zero-length token.
4225  *
4226  * Returns a pointer to the newly-allocated duplicate, or a null
4227  * pointer if memory for the duplicate was not available.  If
4228  * the lenp argument is a non-null pointer, the length of the token
4229  * (not including the '\0') is returned in *lenp.
4230  *
4231  * If successful, the *buf pointer will be updated to point beyond
4232  * the end of the found token.
4233  *
4234  * Note: uses GFP_KERNEL for allocation.
4235  */
4236 static inline char *dup_token(const char **buf, size_t *lenp)
4237 {
4238         char *dup;
4239         size_t len;
4240
4241         len = next_token(buf);
4242         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4243         if (!dup)
4244                 return NULL;
4245         *(dup + len) = '\0';
4246         *buf += len;
4247
4248         if (lenp)
4249                 *lenp = len;
4250
4251         return dup;
4252 }
4253
4254 /*
4255  * Parse the options provided for an "rbd add" (i.e., rbd image
4256  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4257  * and the data written is passed here via a NUL-terminated buffer.
4258  * Returns 0 if successful or an error code otherwise.
4259  *
4260  * The information extracted from these options is recorded in
4261  * the other parameters which return dynamically-allocated
4262  * structures:
4263  *  ceph_opts
4264  *      The address of a pointer that will refer to a ceph options
4265  *      structure.  Caller must release the returned pointer using
4266  *      ceph_destroy_options() when it is no longer needed.
4267  *  rbd_opts
4268  *      Address of an rbd options pointer.  Fully initialized by
4269  *      this function; caller must release with kfree().
4270  *  spec
4271  *      Address of an rbd image specification pointer.  Fully
4272  *      initialized by this function based on parsed options.
4273  *      Caller must release with rbd_spec_put().
4274  *
4275  * The options passed take this form:
4276  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4277  * where:
4278  *  <mon_addrs>
4279  *      A comma-separated list of one or more monitor addresses.
4280  *      A monitor address is an ip address, optionally followed
4281  *      by a port number (separated by a colon).
4282  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4283  *  <options>
4284  *      A comma-separated list of ceph and/or rbd options.
4285  *  <pool_name>
4286  *      The name of the rados pool containing the rbd image.
4287  *  <image_name>
4288  *      The name of the image in that pool to map.
4289  *  <snap_id>
4290  *      An optional snapshot id.  If provided, the mapping will
4291  *      present data from the image at the time that snapshot was
4292  *      created.  The image head is used if no snapshot id is
4293  *      provided.  Snapshot mappings are always read-only.
4294  */
4295 static int rbd_add_parse_args(const char *buf,
4296                                 struct ceph_options **ceph_opts,
4297                                 struct rbd_options **opts,
4298                                 struct rbd_spec **rbd_spec)
4299 {
4300         size_t len;
4301         char *options;
4302         const char *mon_addrs;
4303         char *snap_name;
4304         size_t mon_addrs_size;
4305         struct rbd_spec *spec = NULL;
4306         struct rbd_options *rbd_opts = NULL;
4307         struct ceph_options *copts;
4308         int ret;
4309
4310         /* The first four tokens are required */
4311
4312         len = next_token(&buf);
4313         if (!len) {
4314                 rbd_warn(NULL, "no monitor address(es) provided");
4315                 return -EINVAL;
4316         }
4317         mon_addrs = buf;
4318         mon_addrs_size = len + 1;
4319         buf += len;
4320
4321         ret = -EINVAL;
4322         options = dup_token(&buf, NULL);
4323         if (!options)
4324                 return -ENOMEM;
4325         if (!*options) {
4326                 rbd_warn(NULL, "no options provided");
4327                 goto out_err;
4328         }
4329
4330         spec = rbd_spec_alloc();
4331         if (!spec)
4332                 goto out_mem;
4333
4334         spec->pool_name = dup_token(&buf, NULL);
4335         if (!spec->pool_name)
4336                 goto out_mem;
4337         if (!*spec->pool_name) {
4338                 rbd_warn(NULL, "no pool name provided");
4339                 goto out_err;
4340         }
4341
4342         spec->image_name = dup_token(&buf, NULL);
4343         if (!spec->image_name)
4344                 goto out_mem;
4345         if (!*spec->image_name) {
4346                 rbd_warn(NULL, "no image name provided");
4347                 goto out_err;
4348         }
4349
4350         /*
4351          * Snapshot name is optional; default is to use "-"
4352          * (indicating the head/no snapshot).
4353          */
4354         len = next_token(&buf);
4355         if (!len) {
4356                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4357                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4358         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4359                 ret = -ENAMETOOLONG;
4360                 goto out_err;
4361         }
4362         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4363         if (!snap_name)
4364                 goto out_mem;
4365         *(snap_name + len) = '\0';
4366         spec->snap_name = snap_name;
4367
4368         /* Initialize all rbd options to the defaults */
4369
4370         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4371         if (!rbd_opts)
4372                 goto out_mem;
4373
4374         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4375
4376         copts = ceph_parse_options(options, mon_addrs,
4377                                         mon_addrs + mon_addrs_size - 1,
4378                                         parse_rbd_opts_token, rbd_opts);
4379         if (IS_ERR(copts)) {
4380                 ret = PTR_ERR(copts);
4381                 goto out_err;
4382         }
4383         kfree(options);
4384
4385         *ceph_opts = copts;
4386         *opts = rbd_opts;
4387         *rbd_spec = spec;
4388
4389         return 0;
4390 out_mem:
4391         ret = -ENOMEM;
4392 out_err:
4393         kfree(rbd_opts);
4394         rbd_spec_put(spec);
4395         kfree(options);
4396
4397         return ret;
4398 }
4399
4400 /*
4401  * An rbd format 2 image has a unique identifier, distinct from the
4402  * name given to it by the user.  Internally, that identifier is
4403  * what's used to specify the names of objects related to the image.
4404  *
4405  * A special "rbd id" object is used to map an rbd image name to its
4406  * id.  If that object doesn't exist, then there is no v2 rbd image
4407  * with the supplied name.
4408  *
4409  * This function will record the given rbd_dev's image_id field if
4410  * it can be determined, and in that case will return 0.  If any
4411  * errors occur a negative errno will be returned and the rbd_dev's
4412  * image_id field will be unchanged (and should be NULL).
4413  */
4414 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4415 {
4416         int ret;
4417         size_t size;
4418         char *object_name;
4419         void *response;
4420         char *image_id;
4421
4422         /*
4423          * When probing a parent image, the image id is already
4424          * known (and the image name likely is not).  There's no
4425          * need to fetch the image id again in this case.  We
4426          * do still need to set the image format though.
4427          */
4428         if (rbd_dev->spec->image_id) {
4429                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4430
4431                 return 0;
4432         }
4433
4434         /*
4435          * First, see if the format 2 image id file exists, and if
4436          * so, get the image's persistent id from it.
4437          */
4438         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4439         object_name = kmalloc(size, GFP_NOIO);
4440         if (!object_name)
4441                 return -ENOMEM;
4442         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4443         dout("rbd id object name is %s\n", object_name);
4444
4445         /* Response will be an encoded string, which includes a length */
4446
4447         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4448         response = kzalloc(size, GFP_NOIO);
4449         if (!response) {
4450                 ret = -ENOMEM;
4451                 goto out;
4452         }
4453
4454         /* If it doesn't exist we'll assume it's a format 1 image */
4455
4456         ret = rbd_obj_method_sync(rbd_dev, object_name,
4457                                 "rbd", "get_id", NULL, 0,
4458                                 response, RBD_IMAGE_ID_LEN_MAX);
4459         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4460         if (ret == -ENOENT) {
4461                 image_id = kstrdup("", GFP_KERNEL);
4462                 ret = image_id ? 0 : -ENOMEM;
4463                 if (!ret)
4464                         rbd_dev->image_format = 1;
4465         } else if (ret > sizeof (__le32)) {
4466                 void *p = response;
4467
4468                 image_id = ceph_extract_encoded_string(&p, p + ret,
4469                                                 NULL, GFP_NOIO);
4470                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4471                 if (!ret)
4472                         rbd_dev->image_format = 2;
4473         } else {
4474                 ret = -EINVAL;
4475         }
4476
4477         if (!ret) {
4478                 rbd_dev->spec->image_id = image_id;
4479                 dout("image_id is %s\n", image_id);
4480         }
4481 out:
4482         kfree(response);
4483         kfree(object_name);
4484
4485         return ret;
4486 }
4487
4488 /* Undo whatever state changes are made by v1 or v2 image probe */
4489
4490 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4491 {
4492         struct rbd_image_header *header;
4493
4494         rbd_dev_remove_parent(rbd_dev);
4495         rbd_spec_put(rbd_dev->parent_spec);
4496         rbd_dev->parent_spec = NULL;
4497         rbd_dev->parent_overlap = 0;
4498
4499         /* Free dynamic fields from the header, then zero it out */
4500
4501         header = &rbd_dev->header;
4502         ceph_put_snap_context(header->snapc);
4503         kfree(header->snap_sizes);
4504         kfree(header->snap_names);
4505         kfree(header->object_prefix);
4506         memset(header, 0, sizeof (*header));
4507 }
4508
4509 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4510 {
4511         int ret;
4512
4513         ret = rbd_dev_v2_object_prefix(rbd_dev);
4514         if (ret)
4515                 goto out_err;
4516
4517         /*
4518          * Get the and check features for the image.  Currently the
4519          * features are assumed to never change.
4520          */
4521         ret = rbd_dev_v2_features(rbd_dev);
4522         if (ret)
4523                 goto out_err;
4524
4525         /* If the image supports fancy striping, get its parameters */
4526
4527         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4528                 ret = rbd_dev_v2_striping_info(rbd_dev);
4529                 if (ret < 0)
4530                         goto out_err;
4531         }
4532         /* No support for crypto and compression type format 2 images */
4533
4534         return 0;
4535 out_err:
4536         rbd_dev->header.features = 0;
4537         kfree(rbd_dev->header.object_prefix);
4538         rbd_dev->header.object_prefix = NULL;
4539
4540         return ret;
4541 }
4542
4543 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4544 {
4545         struct rbd_device *parent = NULL;
4546         struct rbd_spec *parent_spec;
4547         struct rbd_client *rbdc;
4548         int ret;
4549
4550         if (!rbd_dev->parent_spec)
4551                 return 0;
4552         /*
4553          * We need to pass a reference to the client and the parent
4554          * spec when creating the parent rbd_dev.  Images related by
4555          * parent/child relationships always share both.
4556          */
4557         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4558         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4559
4560         ret = -ENOMEM;
4561         parent = rbd_dev_create(rbdc, parent_spec);
4562         if (!parent)
4563                 goto out_err;
4564
4565         ret = rbd_dev_image_probe(parent, false);
4566         if (ret < 0)
4567                 goto out_err;
4568         rbd_dev->parent = parent;
4569
4570         return 0;
4571 out_err:
4572         if (parent) {
4573                 rbd_spec_put(rbd_dev->parent_spec);
4574                 kfree(rbd_dev->header_name);
4575                 rbd_dev_destroy(parent);
4576         } else {
4577                 rbd_put_client(rbdc);
4578                 rbd_spec_put(parent_spec);
4579         }
4580
4581         return ret;
4582 }
4583
4584 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4585 {
4586         int ret;
4587
4588         /* generate unique id: find highest unique id, add one */
4589         rbd_dev_id_get(rbd_dev);
4590
4591         /* Fill in the device name, now that we have its id. */
4592         BUILD_BUG_ON(DEV_NAME_LEN
4593                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4594         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4595
4596         /* Get our block major device number. */
4597
4598         ret = register_blkdev(0, rbd_dev->name);
4599         if (ret < 0)
4600                 goto err_out_id;
4601         rbd_dev->major = ret;
4602
4603         /* Set up the blkdev mapping. */
4604
4605         ret = rbd_init_disk(rbd_dev);
4606         if (ret)
4607                 goto err_out_blkdev;
4608
4609         ret = rbd_dev_mapping_set(rbd_dev);
4610         if (ret)
4611                 goto err_out_disk;
4612         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4613
4614         ret = rbd_bus_add_dev(rbd_dev);
4615         if (ret)
4616                 goto err_out_mapping;
4617
4618         /* Everything's ready.  Announce the disk to the world. */
4619
4620         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4621         add_disk(rbd_dev->disk);
4622
4623         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4624                 (unsigned long long) rbd_dev->mapping.size);
4625
4626         return ret;
4627
4628 err_out_mapping:
4629         rbd_dev_mapping_clear(rbd_dev);
4630 err_out_disk:
4631         rbd_free_disk(rbd_dev);
4632 err_out_blkdev:
4633         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4634 err_out_id:
4635         rbd_dev_id_put(rbd_dev);
4636         rbd_dev_mapping_clear(rbd_dev);
4637
4638         return ret;
4639 }
4640
4641 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4642 {
4643         struct rbd_spec *spec = rbd_dev->spec;
4644         size_t size;
4645
4646         /* Record the header object name for this rbd image. */
4647
4648         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4649
4650         if (rbd_dev->image_format == 1)
4651                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4652         else
4653                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4654
4655         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4656         if (!rbd_dev->header_name)
4657                 return -ENOMEM;
4658
4659         if (rbd_dev->image_format == 1)
4660                 sprintf(rbd_dev->header_name, "%s%s",
4661                         spec->image_name, RBD_SUFFIX);
4662         else
4663                 sprintf(rbd_dev->header_name, "%s%s",
4664                         RBD_HEADER_PREFIX, spec->image_id);
4665         return 0;
4666 }
4667
4668 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4669 {
4670         rbd_dev_unprobe(rbd_dev);
4671         kfree(rbd_dev->header_name);
4672         rbd_dev->header_name = NULL;
4673         rbd_dev->image_format = 0;
4674         kfree(rbd_dev->spec->image_id);
4675         rbd_dev->spec->image_id = NULL;
4676
4677         rbd_dev_destroy(rbd_dev);
4678 }
4679
4680 /*
4681  * Probe for the existence of the header object for the given rbd
4682  * device.  If this image is the one being mapped (i.e., not a
4683  * parent), initiate a watch on its header object before using that
4684  * object to get detailed information about the rbd image.
4685  */
4686 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4687 {
4688         int ret;
4689         int tmp;
4690
4691         /*
4692          * Get the id from the image id object.  If it's not a
4693          * format 2 image, we'll get ENOENT back, and we'll assume
4694          * it's a format 1 image.
4695          */
4696         ret = rbd_dev_image_id(rbd_dev);
4697         if (ret)
4698                 return ret;
4699         rbd_assert(rbd_dev->spec->image_id);
4700         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4701
4702         ret = rbd_dev_header_name(rbd_dev);
4703         if (ret)
4704                 goto err_out_format;
4705
4706         if (mapping) {
4707                 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4708                 if (ret)
4709                         goto out_header_name;
4710         }
4711
4712         if (rbd_dev->image_format == 1)
4713                 ret = rbd_dev_v1_header_info(rbd_dev);
4714         else
4715                 ret = rbd_dev_v2_header_info(rbd_dev);
4716         if (ret)
4717                 goto err_out_watch;
4718
4719         ret = rbd_dev_spec_update(rbd_dev);
4720         if (ret)
4721                 goto err_out_probe;
4722
4723         ret = rbd_dev_probe_parent(rbd_dev);
4724         if (ret)
4725                 goto err_out_probe;
4726
4727         dout("discovered format %u image, header name is %s\n",
4728                 rbd_dev->image_format, rbd_dev->header_name);
4729
4730         return 0;
4731 err_out_probe:
4732         rbd_dev_unprobe(rbd_dev);
4733 err_out_watch:
4734         if (mapping) {
4735                 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4736                 if (tmp)
4737                         rbd_warn(rbd_dev, "unable to tear down "
4738                                         "watch request (%d)\n", tmp);
4739         }
4740 out_header_name:
4741         kfree(rbd_dev->header_name);
4742         rbd_dev->header_name = NULL;
4743 err_out_format:
4744         rbd_dev->image_format = 0;
4745         kfree(rbd_dev->spec->image_id);
4746         rbd_dev->spec->image_id = NULL;
4747
4748         dout("probe failed, returning %d\n", ret);
4749
4750         return ret;
4751 }
4752
4753 static ssize_t rbd_add(struct bus_type *bus,
4754                        const char *buf,
4755                        size_t count)
4756 {
4757         struct rbd_device *rbd_dev = NULL;
4758         struct ceph_options *ceph_opts = NULL;
4759         struct rbd_options *rbd_opts = NULL;
4760         struct rbd_spec *spec = NULL;
4761         struct rbd_client *rbdc;
4762         struct ceph_osd_client *osdc;
4763         bool read_only;
4764         int rc = -ENOMEM;
4765
4766         if (!try_module_get(THIS_MODULE))
4767                 return -ENODEV;
4768
4769         /* parse add command */
4770         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4771         if (rc < 0)
4772                 goto err_out_module;
4773         read_only = rbd_opts->read_only;
4774         kfree(rbd_opts);
4775         rbd_opts = NULL;        /* done with this */
4776
4777         rbdc = rbd_get_client(ceph_opts);
4778         if (IS_ERR(rbdc)) {
4779                 rc = PTR_ERR(rbdc);
4780                 goto err_out_args;
4781         }
4782         ceph_opts = NULL;       /* rbd_dev client now owns this */
4783
4784         /* pick the pool */
4785         osdc = &rbdc->client->osdc;
4786         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4787         if (rc < 0)
4788                 goto err_out_client;
4789         spec->pool_id = (u64)rc;
4790
4791         /* The ceph file layout needs to fit pool id in 32 bits */
4792
4793         if (spec->pool_id > (u64)U32_MAX) {
4794                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4795                                 (unsigned long long)spec->pool_id, U32_MAX);
4796                 rc = -EIO;
4797                 goto err_out_client;
4798         }
4799
4800         rbd_dev = rbd_dev_create(rbdc, spec);
4801         if (!rbd_dev)
4802                 goto err_out_client;
4803         rbdc = NULL;            /* rbd_dev now owns this */
4804         spec = NULL;            /* rbd_dev now owns this */
4805
4806         rc = rbd_dev_image_probe(rbd_dev, true);
4807         if (rc < 0)
4808                 goto err_out_rbd_dev;
4809
4810         /* If we are mapping a snapshot it must be marked read-only */
4811
4812         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4813                 read_only = true;
4814         rbd_dev->mapping.read_only = read_only;
4815
4816         rc = rbd_dev_device_setup(rbd_dev);
4817         if (!rc)
4818                 return count;
4819
4820         rbd_dev_image_release(rbd_dev);
4821 err_out_rbd_dev:
4822         rbd_dev_destroy(rbd_dev);
4823 err_out_client:
4824         rbd_put_client(rbdc);
4825 err_out_args:
4826         if (ceph_opts)
4827                 ceph_destroy_options(ceph_opts);
4828         kfree(rbd_opts);
4829         rbd_spec_put(spec);
4830 err_out_module:
4831         module_put(THIS_MODULE);
4832
4833         dout("Error adding device %s\n", buf);
4834
4835         return (ssize_t)rc;
4836 }
4837
4838 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4839 {
4840         struct list_head *tmp;
4841         struct rbd_device *rbd_dev;
4842
4843         spin_lock(&rbd_dev_list_lock);
4844         list_for_each(tmp, &rbd_dev_list) {
4845                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4846                 if (rbd_dev->dev_id == dev_id) {
4847                         spin_unlock(&rbd_dev_list_lock);
4848                         return rbd_dev;
4849                 }
4850         }
4851         spin_unlock(&rbd_dev_list_lock);
4852         return NULL;
4853 }
4854
4855 static void rbd_dev_device_release(struct device *dev)
4856 {
4857         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4858
4859         rbd_free_disk(rbd_dev);
4860         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4861         rbd_dev_mapping_clear(rbd_dev);
4862         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4863         rbd_dev->major = 0;
4864         rbd_dev_id_put(rbd_dev);
4865         rbd_dev_mapping_clear(rbd_dev);
4866 }
4867
4868 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4869 {
4870         while (rbd_dev->parent) {
4871                 struct rbd_device *first = rbd_dev;
4872                 struct rbd_device *second = first->parent;
4873                 struct rbd_device *third;
4874
4875                 /*
4876                  * Follow to the parent with no grandparent and
4877                  * remove it.
4878                  */
4879                 while (second && (third = second->parent)) {
4880                         first = second;
4881                         second = third;
4882                 }
4883                 rbd_assert(second);
4884                 rbd_dev_image_release(second);
4885                 first->parent = NULL;
4886                 first->parent_overlap = 0;
4887
4888                 rbd_assert(first->parent_spec);
4889                 rbd_spec_put(first->parent_spec);
4890                 first->parent_spec = NULL;
4891         }
4892 }
4893
4894 static ssize_t rbd_remove(struct bus_type *bus,
4895                           const char *buf,
4896                           size_t count)
4897 {
4898         struct rbd_device *rbd_dev = NULL;
4899         int target_id;
4900         unsigned long ul;
4901         int ret;
4902
4903         ret = strict_strtoul(buf, 10, &ul);
4904         if (ret)
4905                 return ret;
4906
4907         /* convert to int; abort if we lost anything in the conversion */
4908         target_id = (int) ul;
4909         if (target_id != ul)
4910                 return -EINVAL;
4911
4912         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4913
4914         rbd_dev = __rbd_get_dev(target_id);
4915         if (!rbd_dev) {
4916                 ret = -ENOENT;
4917                 goto done;
4918         }
4919
4920         spin_lock_irq(&rbd_dev->lock);
4921         if (rbd_dev->open_count)
4922                 ret = -EBUSY;
4923         else
4924                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4925         spin_unlock_irq(&rbd_dev->lock);
4926         if (ret < 0)
4927                 goto done;
4928         rbd_bus_del_dev(rbd_dev);
4929         ret = rbd_dev_header_watch_sync(rbd_dev, false);
4930         if (ret)
4931                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4932         rbd_dev_image_release(rbd_dev);
4933         module_put(THIS_MODULE);
4934         ret = count;
4935 done:
4936         mutex_unlock(&ctl_mutex);
4937
4938         return ret;
4939 }
4940
4941 /*
4942  * create control files in sysfs
4943  * /sys/bus/rbd/...
4944  */
4945 static int rbd_sysfs_init(void)
4946 {
4947         int ret;
4948
4949         ret = device_register(&rbd_root_dev);
4950         if (ret < 0)
4951                 return ret;
4952
4953         ret = bus_register(&rbd_bus_type);
4954         if (ret < 0)
4955                 device_unregister(&rbd_root_dev);
4956
4957         return ret;
4958 }
4959
4960 static void rbd_sysfs_cleanup(void)
4961 {
4962         bus_unregister(&rbd_bus_type);
4963         device_unregister(&rbd_root_dev);
4964 }
4965
4966 static int rbd_slab_init(void)
4967 {
4968         rbd_assert(!rbd_img_request_cache);
4969         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
4970                                         sizeof (struct rbd_img_request),
4971                                         __alignof__(struct rbd_img_request),
4972                                         0, NULL);
4973         if (!rbd_img_request_cache)
4974                 return -ENOMEM;
4975
4976         rbd_assert(!rbd_obj_request_cache);
4977         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
4978                                         sizeof (struct rbd_obj_request),
4979                                         __alignof__(struct rbd_obj_request),
4980                                         0, NULL);
4981         if (!rbd_obj_request_cache)
4982                 goto out_err;
4983
4984         rbd_assert(!rbd_segment_name_cache);
4985         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
4986                                         MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
4987         if (rbd_segment_name_cache)
4988                 return 0;
4989 out_err:
4990         if (rbd_obj_request_cache) {
4991                 kmem_cache_destroy(rbd_obj_request_cache);
4992                 rbd_obj_request_cache = NULL;
4993         }
4994
4995         kmem_cache_destroy(rbd_img_request_cache);
4996         rbd_img_request_cache = NULL;
4997
4998         return -ENOMEM;
4999 }
5000
5001 static void rbd_slab_exit(void)
5002 {
5003         rbd_assert(rbd_segment_name_cache);
5004         kmem_cache_destroy(rbd_segment_name_cache);
5005         rbd_segment_name_cache = NULL;
5006
5007         rbd_assert(rbd_obj_request_cache);
5008         kmem_cache_destroy(rbd_obj_request_cache);
5009         rbd_obj_request_cache = NULL;
5010
5011         rbd_assert(rbd_img_request_cache);
5012         kmem_cache_destroy(rbd_img_request_cache);
5013         rbd_img_request_cache = NULL;
5014 }
5015
5016 static int __init rbd_init(void)
5017 {
5018         int rc;
5019
5020         if (!libceph_compatible(NULL)) {
5021                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5022
5023                 return -EINVAL;
5024         }
5025         rc = rbd_slab_init();
5026         if (rc)
5027                 return rc;
5028         rc = rbd_sysfs_init();
5029         if (rc)
5030                 rbd_slab_exit();
5031         else
5032                 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5033
5034         return rc;
5035 }
5036
5037 static void __exit rbd_exit(void)
5038 {
5039         rbd_sysfs_cleanup();
5040         rbd_slab_exit();
5041 }
5042
5043 module_init(rbd_init);
5044 module_exit(rbd_exit);
5045
5046 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5047 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5048 MODULE_DESCRIPTION("rados block device");
5049
5050 /* following authorship retained from original osdblk.c */
5051 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5052
5053 MODULE_LICENSE("GPL");