]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: define rbd_update_mapping_size()
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN         1024
67
68 #define RBD_SNAP_HEAD_NAME      "-"
69
70 #define RBD_IMAGE_ID_LEN_MAX    64
71 #define RBD_OBJ_PREFIX_LEN_MAX  64
72
73 /*
74  * An RBD device name will be "rbd#", where the "rbd" comes from
75  * RBD_DRV_NAME above, and # is a unique integer identifier.
76  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
77  * enough to hold all possible device names.
78  */
79 #define DEV_NAME_LEN            32
80 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
81
82 #define RBD_READ_ONLY_DEFAULT           false
83
84 /*
85  * block device image metadata (in-memory version)
86  */
87 struct rbd_image_header {
88         /* These four fields never change for a given rbd image */
89         char *object_prefix;
90         u64 features;
91         __u8 obj_order;
92         __u8 crypt_type;
93         __u8 comp_type;
94
95         /* The remaining fields need to be updated occasionally */
96         u64 image_size;
97         struct ceph_snap_context *snapc;
98         char *snap_names;
99         u64 *snap_sizes;
100
101         u64 obj_version;
102 };
103
104 struct rbd_options {
105         bool    read_only;
106 };
107
108 /*
109  * an instance of the client.  multiple devices may share an rbd client.
110  */
111 struct rbd_client {
112         struct ceph_client      *client;
113         struct kref             kref;
114         struct list_head        node;
115 };
116
117 /*
118  * a request completion status
119  */
120 struct rbd_req_status {
121         int done;
122         int rc;
123         u64 bytes;
124 };
125
126 /*
127  * a collection of requests
128  */
129 struct rbd_req_coll {
130         int                     total;
131         int                     num_done;
132         struct kref             kref;
133         struct rbd_req_status   status[0];
134 };
135
136 /*
137  * a single io request
138  */
139 struct rbd_request {
140         struct request          *rq;            /* blk layer request */
141         struct bio              *bio;           /* cloned bio */
142         struct page             **pages;        /* list of used pages */
143         u64                     len;
144         int                     coll_index;
145         struct rbd_req_coll     *coll;
146 };
147
148 struct rbd_snap {
149         struct  device          dev;
150         const char              *name;
151         u64                     size;
152         struct list_head        node;
153         u64                     id;
154         u64                     features;
155 };
156
157 struct rbd_mapping {
158         char                    *snap_name;
159         u64                     snap_id;
160         u64                     size;
161         u64                     features;
162         bool                    snap_exists;
163         bool                    read_only;
164 };
165
166 /*
167  * a single device
168  */
169 struct rbd_device {
170         int                     dev_id;         /* blkdev unique id */
171
172         int                     major;          /* blkdev assigned major */
173         struct gendisk          *disk;          /* blkdev's gendisk and rq */
174
175         u32                     image_format;   /* Either 1 or 2 */
176         struct rbd_options      rbd_opts;
177         struct rbd_client       *rbd_client;
178
179         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180
181         spinlock_t              lock;           /* queue lock */
182
183         struct rbd_image_header header;
184         char                    *image_id;
185         size_t                  image_id_len;
186         char                    *image_name;
187         size_t                  image_name_len;
188         char                    *header_name;
189         char                    *pool_name;
190         int                     pool_id;
191
192         struct ceph_osd_event   *watch_event;
193         struct ceph_osd_request *watch_request;
194
195         /* protects updating the header */
196         struct rw_semaphore     header_rwsem;
197
198         struct rbd_mapping      mapping;
199
200         struct list_head        node;
201
202         /* list of snapshots */
203         struct list_head        snaps;
204
205         /* sysfs related */
206         struct device           dev;
207 };
208
209 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
210
211 static LIST_HEAD(rbd_dev_list);    /* devices */
212 static DEFINE_SPINLOCK(rbd_dev_list_lock);
213
214 static LIST_HEAD(rbd_client_list);              /* clients */
215 static DEFINE_SPINLOCK(rbd_client_list_lock);
216
217 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
219
220 static void rbd_dev_release(struct device *dev);
221 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
222
223 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
224                        size_t count);
225 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
226                           size_t count);
227
228 static struct bus_attribute rbd_bus_attrs[] = {
229         __ATTR(add, S_IWUSR, NULL, rbd_add),
230         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
231         __ATTR_NULL
232 };
233
234 static struct bus_type rbd_bus_type = {
235         .name           = "rbd",
236         .bus_attrs      = rbd_bus_attrs,
237 };
238
239 static void rbd_root_dev_release(struct device *dev)
240 {
241 }
242
243 static struct device rbd_root_dev = {
244         .init_name =    "rbd",
245         .release =      rbd_root_dev_release,
246 };
247
248 #ifdef RBD_DEBUG
249 #define rbd_assert(expr)                                                \
250                 if (unlikely(!(expr))) {                                \
251                         printk(KERN_ERR "\nAssertion failure in %s() "  \
252                                                 "at line %d:\n\n"       \
253                                         "\trbd_assert(%s);\n\n",        \
254                                         __func__, __LINE__, #expr);     \
255                         BUG();                                          \
256                 }
257 #else /* !RBD_DEBUG */
258 #  define rbd_assert(expr)      ((void) 0)
259 #endif /* !RBD_DEBUG */
260
261 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
262 {
263         return get_device(&rbd_dev->dev);
264 }
265
266 static void rbd_put_dev(struct rbd_device *rbd_dev)
267 {
268         put_device(&rbd_dev->dev);
269 }
270
271 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
272
273 static int rbd_open(struct block_device *bdev, fmode_t mode)
274 {
275         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
276
277         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
278                 return -EROFS;
279
280         rbd_get_dev(rbd_dev);
281         set_device_ro(bdev, rbd_dev->mapping.read_only);
282
283         return 0;
284 }
285
286 static int rbd_release(struct gendisk *disk, fmode_t mode)
287 {
288         struct rbd_device *rbd_dev = disk->private_data;
289
290         rbd_put_dev(rbd_dev);
291
292         return 0;
293 }
294
295 static const struct block_device_operations rbd_bd_ops = {
296         .owner                  = THIS_MODULE,
297         .open                   = rbd_open,
298         .release                = rbd_release,
299 };
300
301 /*
302  * Initialize an rbd client instance.
303  * We own *ceph_opts.
304  */
305 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
306 {
307         struct rbd_client *rbdc;
308         int ret = -ENOMEM;
309
310         dout("rbd_client_create\n");
311         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
312         if (!rbdc)
313                 goto out_opt;
314
315         kref_init(&rbdc->kref);
316         INIT_LIST_HEAD(&rbdc->node);
317
318         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
319
320         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
321         if (IS_ERR(rbdc->client))
322                 goto out_mutex;
323         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
324
325         ret = ceph_open_session(rbdc->client);
326         if (ret < 0)
327                 goto out_err;
328
329         spin_lock(&rbd_client_list_lock);
330         list_add_tail(&rbdc->node, &rbd_client_list);
331         spin_unlock(&rbd_client_list_lock);
332
333         mutex_unlock(&ctl_mutex);
334
335         dout("rbd_client_create created %p\n", rbdc);
336         return rbdc;
337
338 out_err:
339         ceph_destroy_client(rbdc->client);
340 out_mutex:
341         mutex_unlock(&ctl_mutex);
342         kfree(rbdc);
343 out_opt:
344         if (ceph_opts)
345                 ceph_destroy_options(ceph_opts);
346         return ERR_PTR(ret);
347 }
348
349 /*
350  * Find a ceph client with specific addr and configuration.  If
351  * found, bump its reference count.
352  */
353 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
354 {
355         struct rbd_client *client_node;
356         bool found = false;
357
358         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
359                 return NULL;
360
361         spin_lock(&rbd_client_list_lock);
362         list_for_each_entry(client_node, &rbd_client_list, node) {
363                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
364                         kref_get(&client_node->kref);
365                         found = true;
366                         break;
367                 }
368         }
369         spin_unlock(&rbd_client_list_lock);
370
371         return found ? client_node : NULL;
372 }
373
374 /*
375  * mount options
376  */
377 enum {
378         Opt_last_int,
379         /* int args above */
380         Opt_last_string,
381         /* string args above */
382         Opt_read_only,
383         Opt_read_write,
384         /* Boolean args above */
385         Opt_last_bool,
386 };
387
388 static match_table_t rbd_opts_tokens = {
389         /* int args above */
390         /* string args above */
391         {Opt_read_only, "mapping.read_only"},
392         {Opt_read_only, "ro"},          /* Alternate spelling */
393         {Opt_read_write, "read_write"},
394         {Opt_read_write, "rw"},         /* Alternate spelling */
395         /* Boolean args above */
396         {-1, NULL}
397 };
398
399 static int parse_rbd_opts_token(char *c, void *private)
400 {
401         struct rbd_options *rbd_opts = private;
402         substring_t argstr[MAX_OPT_ARGS];
403         int token, intval, ret;
404
405         token = match_token(c, rbd_opts_tokens, argstr);
406         if (token < 0)
407                 return -EINVAL;
408
409         if (token < Opt_last_int) {
410                 ret = match_int(&argstr[0], &intval);
411                 if (ret < 0) {
412                         pr_err("bad mount option arg (not int) "
413                                "at '%s'\n", c);
414                         return ret;
415                 }
416                 dout("got int token %d val %d\n", token, intval);
417         } else if (token > Opt_last_int && token < Opt_last_string) {
418                 dout("got string token %d val %s\n", token,
419                      argstr[0].from);
420         } else if (token > Opt_last_string && token < Opt_last_bool) {
421                 dout("got Boolean token %d\n", token);
422         } else {
423                 dout("got token %d\n", token);
424         }
425
426         switch (token) {
427         case Opt_read_only:
428                 rbd_opts->read_only = true;
429                 break;
430         case Opt_read_write:
431                 rbd_opts->read_only = false;
432                 break;
433         default:
434                 rbd_assert(false);
435                 break;
436         }
437         return 0;
438 }
439
440 /*
441  * Get a ceph client with specific addr and configuration, if one does
442  * not exist create it.
443  */
444 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
445                                 size_t mon_addr_len, char *options)
446 {
447         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
448         struct ceph_options *ceph_opts;
449         struct rbd_client *rbdc;
450
451         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
452
453         ceph_opts = ceph_parse_options(options, mon_addr,
454                                         mon_addr + mon_addr_len,
455                                         parse_rbd_opts_token, rbd_opts);
456         if (IS_ERR(ceph_opts))
457                 return PTR_ERR(ceph_opts);
458
459         rbdc = rbd_client_find(ceph_opts);
460         if (rbdc) {
461                 /* using an existing client */
462                 ceph_destroy_options(ceph_opts);
463         } else {
464                 rbdc = rbd_client_create(ceph_opts);
465                 if (IS_ERR(rbdc))
466                         return PTR_ERR(rbdc);
467         }
468         rbd_dev->rbd_client = rbdc;
469
470         return 0;
471 }
472
473 /*
474  * Destroy ceph client
475  *
476  * Caller must hold rbd_client_list_lock.
477  */
478 static void rbd_client_release(struct kref *kref)
479 {
480         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
481
482         dout("rbd_release_client %p\n", rbdc);
483         spin_lock(&rbd_client_list_lock);
484         list_del(&rbdc->node);
485         spin_unlock(&rbd_client_list_lock);
486
487         ceph_destroy_client(rbdc->client);
488         kfree(rbdc);
489 }
490
491 /*
492  * Drop reference to ceph client node. If it's not referenced anymore, release
493  * it.
494  */
495 static void rbd_put_client(struct rbd_device *rbd_dev)
496 {
497         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
498         rbd_dev->rbd_client = NULL;
499 }
500
501 /*
502  * Destroy requests collection
503  */
504 static void rbd_coll_release(struct kref *kref)
505 {
506         struct rbd_req_coll *coll =
507                 container_of(kref, struct rbd_req_coll, kref);
508
509         dout("rbd_coll_release %p\n", coll);
510         kfree(coll);
511 }
512
513 static bool rbd_image_format_valid(u32 image_format)
514 {
515         return image_format == 1 || image_format == 2;
516 }
517
518 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
519 {
520         size_t size;
521         u32 snap_count;
522
523         /* The header has to start with the magic rbd header text */
524         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
525                 return false;
526
527         /*
528          * The size of a snapshot header has to fit in a size_t, and
529          * that limits the number of snapshots.
530          */
531         snap_count = le32_to_cpu(ondisk->snap_count);
532         size = SIZE_MAX - sizeof (struct ceph_snap_context);
533         if (snap_count > size / sizeof (__le64))
534                 return false;
535
536         /*
537          * Not only that, but the size of the entire the snapshot
538          * header must also be representable in a size_t.
539          */
540         size -= snap_count * sizeof (__le64);
541         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
542                 return false;
543
544         return true;
545 }
546
547 /*
548  * Create a new header structure, translate header format from the on-disk
549  * header.
550  */
551 static int rbd_header_from_disk(struct rbd_image_header *header,
552                                  struct rbd_image_header_ondisk *ondisk)
553 {
554         u32 snap_count;
555         size_t len;
556         size_t size;
557         u32 i;
558
559         memset(header, 0, sizeof (*header));
560
561         snap_count = le32_to_cpu(ondisk->snap_count);
562
563         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
564         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
565         if (!header->object_prefix)
566                 return -ENOMEM;
567         memcpy(header->object_prefix, ondisk->object_prefix, len);
568         header->object_prefix[len] = '\0';
569
570         if (snap_count) {
571                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
572
573                 /* Save a copy of the snapshot names */
574
575                 if (snap_names_len > (u64) SIZE_MAX)
576                         return -EIO;
577                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
578                 if (!header->snap_names)
579                         goto out_err;
580                 /*
581                  * Note that rbd_dev_v1_header_read() guarantees
582                  * the ondisk buffer we're working with has
583                  * snap_names_len bytes beyond the end of the
584                  * snapshot id array, this memcpy() is safe.
585                  */
586                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
587                         snap_names_len);
588
589                 /* Record each snapshot's size */
590
591                 size = snap_count * sizeof (*header->snap_sizes);
592                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
593                 if (!header->snap_sizes)
594                         goto out_err;
595                 for (i = 0; i < snap_count; i++)
596                         header->snap_sizes[i] =
597                                 le64_to_cpu(ondisk->snaps[i].image_size);
598         } else {
599                 WARN_ON(ondisk->snap_names_len);
600                 header->snap_names = NULL;
601                 header->snap_sizes = NULL;
602         }
603
604         header->features = 0;   /* No features support in v1 images */
605         header->obj_order = ondisk->options.order;
606         header->crypt_type = ondisk->options.crypt_type;
607         header->comp_type = ondisk->options.comp_type;
608
609         /* Allocate and fill in the snapshot context */
610
611         header->image_size = le64_to_cpu(ondisk->image_size);
612         size = sizeof (struct ceph_snap_context);
613         size += snap_count * sizeof (header->snapc->snaps[0]);
614         header->snapc = kzalloc(size, GFP_KERNEL);
615         if (!header->snapc)
616                 goto out_err;
617
618         atomic_set(&header->snapc->nref, 1);
619         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
620         header->snapc->num_snaps = snap_count;
621         for (i = 0; i < snap_count; i++)
622                 header->snapc->snaps[i] =
623                         le64_to_cpu(ondisk->snaps[i].id);
624
625         return 0;
626
627 out_err:
628         kfree(header->snap_sizes);
629         header->snap_sizes = NULL;
630         kfree(header->snap_names);
631         header->snap_names = NULL;
632         kfree(header->object_prefix);
633         header->object_prefix = NULL;
634
635         return -ENOMEM;
636 }
637
638 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
639 {
640
641         struct rbd_snap *snap;
642
643         list_for_each_entry(snap, &rbd_dev->snaps, node) {
644                 if (!strcmp(snap_name, snap->name)) {
645                         rbd_dev->mapping.snap_id = snap->id;
646                         rbd_dev->mapping.size = snap->size;
647                         rbd_dev->mapping.features = snap->features;
648
649                         return 0;
650                 }
651         }
652
653         return -ENOENT;
654 }
655
656 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
657 {
658         int ret;
659
660         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
661                     sizeof (RBD_SNAP_HEAD_NAME))) {
662                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
663                 rbd_dev->mapping.size = rbd_dev->header.image_size;
664                 rbd_dev->mapping.features = rbd_dev->header.features;
665                 rbd_dev->mapping.snap_exists = false;
666                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
667                 ret = 0;
668         } else {
669                 ret = snap_by_name(rbd_dev, snap_name);
670                 if (ret < 0)
671                         goto done;
672                 rbd_dev->mapping.snap_exists = true;
673                 rbd_dev->mapping.read_only = true;
674         }
675         rbd_dev->mapping.snap_name = snap_name;
676 done:
677         return ret;
678 }
679
680 static void rbd_header_free(struct rbd_image_header *header)
681 {
682         kfree(header->object_prefix);
683         header->object_prefix = NULL;
684         kfree(header->snap_sizes);
685         header->snap_sizes = NULL;
686         kfree(header->snap_names);
687         header->snap_names = NULL;
688         ceph_put_snap_context(header->snapc);
689         header->snapc = NULL;
690 }
691
692 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
693 {
694         char *name;
695         u64 segment;
696         int ret;
697
698         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
699         if (!name)
700                 return NULL;
701         segment = offset >> rbd_dev->header.obj_order;
702         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
703                         rbd_dev->header.object_prefix, segment);
704         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
705                 pr_err("error formatting segment name for #%llu (%d)\n",
706                         segment, ret);
707                 kfree(name);
708                 name = NULL;
709         }
710
711         return name;
712 }
713
714 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
715 {
716         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
717
718         return offset & (segment_size - 1);
719 }
720
721 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
722                                 u64 offset, u64 length)
723 {
724         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
725
726         offset &= segment_size - 1;
727
728         rbd_assert(length <= U64_MAX - offset);
729         if (offset + length > segment_size)
730                 length = segment_size - offset;
731
732         return length;
733 }
734
735 static int rbd_get_num_segments(struct rbd_image_header *header,
736                                 u64 ofs, u64 len)
737 {
738         u64 start_seg;
739         u64 end_seg;
740
741         if (!len)
742                 return 0;
743         if (len - 1 > U64_MAX - ofs)
744                 return -ERANGE;
745
746         start_seg = ofs >> header->obj_order;
747         end_seg = (ofs + len - 1) >> header->obj_order;
748
749         return end_seg - start_seg + 1;
750 }
751
752 /*
753  * returns the size of an object in the image
754  */
755 static u64 rbd_obj_bytes(struct rbd_image_header *header)
756 {
757         return 1 << header->obj_order;
758 }
759
760 /*
761  * bio helpers
762  */
763
764 static void bio_chain_put(struct bio *chain)
765 {
766         struct bio *tmp;
767
768         while (chain) {
769                 tmp = chain;
770                 chain = chain->bi_next;
771                 bio_put(tmp);
772         }
773 }
774
775 /*
776  * zeros a bio chain, starting at specific offset
777  */
778 static void zero_bio_chain(struct bio *chain, int start_ofs)
779 {
780         struct bio_vec *bv;
781         unsigned long flags;
782         void *buf;
783         int i;
784         int pos = 0;
785
786         while (chain) {
787                 bio_for_each_segment(bv, chain, i) {
788                         if (pos + bv->bv_len > start_ofs) {
789                                 int remainder = max(start_ofs - pos, 0);
790                                 buf = bvec_kmap_irq(bv, &flags);
791                                 memset(buf + remainder, 0,
792                                        bv->bv_len - remainder);
793                                 bvec_kunmap_irq(buf, &flags);
794                         }
795                         pos += bv->bv_len;
796                 }
797
798                 chain = chain->bi_next;
799         }
800 }
801
802 /*
803  * bio_chain_clone - clone a chain of bios up to a certain length.
804  * might return a bio_pair that will need to be released.
805  */
806 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
807                                    struct bio_pair **bp,
808                                    int len, gfp_t gfpmask)
809 {
810         struct bio *old_chain = *old;
811         struct bio *new_chain = NULL;
812         struct bio *tail;
813         int total = 0;
814
815         if (*bp) {
816                 bio_pair_release(*bp);
817                 *bp = NULL;
818         }
819
820         while (old_chain && (total < len)) {
821                 struct bio *tmp;
822
823                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
824                 if (!tmp)
825                         goto err_out;
826                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
827
828                 if (total + old_chain->bi_size > len) {
829                         struct bio_pair *bp;
830
831                         /*
832                          * this split can only happen with a single paged bio,
833                          * split_bio will BUG_ON if this is not the case
834                          */
835                         dout("bio_chain_clone split! total=%d remaining=%d"
836                              "bi_size=%u\n",
837                              total, len - total, old_chain->bi_size);
838
839                         /* split the bio. We'll release it either in the next
840                            call, or it will have to be released outside */
841                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
842                         if (!bp)
843                                 goto err_out;
844
845                         __bio_clone(tmp, &bp->bio1);
846
847                         *next = &bp->bio2;
848                 } else {
849                         __bio_clone(tmp, old_chain);
850                         *next = old_chain->bi_next;
851                 }
852
853                 tmp->bi_bdev = NULL;
854                 tmp->bi_next = NULL;
855                 if (new_chain)
856                         tail->bi_next = tmp;
857                 else
858                         new_chain = tmp;
859                 tail = tmp;
860                 old_chain = old_chain->bi_next;
861
862                 total += tmp->bi_size;
863         }
864
865         rbd_assert(total == len);
866
867         *old = old_chain;
868
869         return new_chain;
870
871 err_out:
872         dout("bio_chain_clone with err\n");
873         bio_chain_put(new_chain);
874         return NULL;
875 }
876
877 /*
878  * helpers for osd request op vectors.
879  */
880 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
881                                         int opcode, u32 payload_len)
882 {
883         struct ceph_osd_req_op *ops;
884
885         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
886         if (!ops)
887                 return NULL;
888
889         ops[0].op = opcode;
890
891         /*
892          * op extent offset and length will be set later on
893          * in calc_raw_layout()
894          */
895         ops[0].payload_len = payload_len;
896
897         return ops;
898 }
899
900 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
901 {
902         kfree(ops);
903 }
904
905 static void rbd_coll_end_req_index(struct request *rq,
906                                    struct rbd_req_coll *coll,
907                                    int index,
908                                    int ret, u64 len)
909 {
910         struct request_queue *q;
911         int min, max, i;
912
913         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
914              coll, index, ret, (unsigned long long) len);
915
916         if (!rq)
917                 return;
918
919         if (!coll) {
920                 blk_end_request(rq, ret, len);
921                 return;
922         }
923
924         q = rq->q;
925
926         spin_lock_irq(q->queue_lock);
927         coll->status[index].done = 1;
928         coll->status[index].rc = ret;
929         coll->status[index].bytes = len;
930         max = min = coll->num_done;
931         while (max < coll->total && coll->status[max].done)
932                 max++;
933
934         for (i = min; i<max; i++) {
935                 __blk_end_request(rq, coll->status[i].rc,
936                                   coll->status[i].bytes);
937                 coll->num_done++;
938                 kref_put(&coll->kref, rbd_coll_release);
939         }
940         spin_unlock_irq(q->queue_lock);
941 }
942
943 static void rbd_coll_end_req(struct rbd_request *req,
944                              int ret, u64 len)
945 {
946         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
947 }
948
949 /*
950  * Send ceph osd request
951  */
952 static int rbd_do_request(struct request *rq,
953                           struct rbd_device *rbd_dev,
954                           struct ceph_snap_context *snapc,
955                           u64 snapid,
956                           const char *object_name, u64 ofs, u64 len,
957                           struct bio *bio,
958                           struct page **pages,
959                           int num_pages,
960                           int flags,
961                           struct ceph_osd_req_op *ops,
962                           struct rbd_req_coll *coll,
963                           int coll_index,
964                           void (*rbd_cb)(struct ceph_osd_request *req,
965                                          struct ceph_msg *msg),
966                           struct ceph_osd_request **linger_req,
967                           u64 *ver)
968 {
969         struct ceph_osd_request *req;
970         struct ceph_file_layout *layout;
971         int ret;
972         u64 bno;
973         struct timespec mtime = CURRENT_TIME;
974         struct rbd_request *req_data;
975         struct ceph_osd_request_head *reqhead;
976         struct ceph_osd_client *osdc;
977
978         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
979         if (!req_data) {
980                 if (coll)
981                         rbd_coll_end_req_index(rq, coll, coll_index,
982                                                -ENOMEM, len);
983                 return -ENOMEM;
984         }
985
986         if (coll) {
987                 req_data->coll = coll;
988                 req_data->coll_index = coll_index;
989         }
990
991         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
992                 (unsigned long long) ofs, (unsigned long long) len);
993
994         osdc = &rbd_dev->rbd_client->client->osdc;
995         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
996                                         false, GFP_NOIO, pages, bio);
997         if (!req) {
998                 ret = -ENOMEM;
999                 goto done_pages;
1000         }
1001
1002         req->r_callback = rbd_cb;
1003
1004         req_data->rq = rq;
1005         req_data->bio = bio;
1006         req_data->pages = pages;
1007         req_data->len = len;
1008
1009         req->r_priv = req_data;
1010
1011         reqhead = req->r_request->front.iov_base;
1012         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1013
1014         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1015         req->r_oid_len = strlen(req->r_oid);
1016
1017         layout = &req->r_file_layout;
1018         memset(layout, 0, sizeof(*layout));
1019         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020         layout->fl_stripe_count = cpu_to_le32(1);
1021         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1022         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1023         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1024                                    req, ops);
1025         rbd_assert(ret == 0);
1026
1027         ceph_osdc_build_request(req, ofs, &len,
1028                                 ops,
1029                                 snapc,
1030                                 &mtime,
1031                                 req->r_oid, req->r_oid_len);
1032
1033         if (linger_req) {
1034                 ceph_osdc_set_request_linger(osdc, req);
1035                 *linger_req = req;
1036         }
1037
1038         ret = ceph_osdc_start_request(osdc, req, false);
1039         if (ret < 0)
1040                 goto done_err;
1041
1042         if (!rbd_cb) {
1043                 ret = ceph_osdc_wait_request(osdc, req);
1044                 if (ver)
1045                         *ver = le64_to_cpu(req->r_reassert_version.version);
1046                 dout("reassert_ver=%llu\n",
1047                         (unsigned long long)
1048                                 le64_to_cpu(req->r_reassert_version.version));
1049                 ceph_osdc_put_request(req);
1050         }
1051         return ret;
1052
1053 done_err:
1054         bio_chain_put(req_data->bio);
1055         ceph_osdc_put_request(req);
1056 done_pages:
1057         rbd_coll_end_req(req_data, ret, len);
1058         kfree(req_data);
1059         return ret;
1060 }
1061
1062 /*
1063  * Ceph osd op callback
1064  */
1065 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1066 {
1067         struct rbd_request *req_data = req->r_priv;
1068         struct ceph_osd_reply_head *replyhead;
1069         struct ceph_osd_op *op;
1070         __s32 rc;
1071         u64 bytes;
1072         int read_op;
1073
1074         /* parse reply */
1075         replyhead = msg->front.iov_base;
1076         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1077         op = (void *)(replyhead + 1);
1078         rc = le32_to_cpu(replyhead->result);
1079         bytes = le64_to_cpu(op->extent.length);
1080         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1081
1082         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1083                 (unsigned long long) bytes, read_op, (int) rc);
1084
1085         if (rc == -ENOENT && read_op) {
1086                 zero_bio_chain(req_data->bio, 0);
1087                 rc = 0;
1088         } else if (rc == 0 && read_op && bytes < req_data->len) {
1089                 zero_bio_chain(req_data->bio, bytes);
1090                 bytes = req_data->len;
1091         }
1092
1093         rbd_coll_end_req(req_data, rc, bytes);
1094
1095         if (req_data->bio)
1096                 bio_chain_put(req_data->bio);
1097
1098         ceph_osdc_put_request(req);
1099         kfree(req_data);
1100 }
1101
1102 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1103 {
1104         ceph_osdc_put_request(req);
1105 }
1106
1107 /*
1108  * Do a synchronous ceph osd operation
1109  */
1110 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1111                            struct ceph_snap_context *snapc,
1112                            u64 snapid,
1113                            int flags,
1114                            struct ceph_osd_req_op *ops,
1115                            const char *object_name,
1116                            u64 ofs, u64 inbound_size,
1117                            char *inbound,
1118                            struct ceph_osd_request **linger_req,
1119                            u64 *ver)
1120 {
1121         int ret;
1122         struct page **pages;
1123         int num_pages;
1124
1125         rbd_assert(ops != NULL);
1126
1127         num_pages = calc_pages_for(ofs, inbound_size);
1128         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1129         if (IS_ERR(pages))
1130                 return PTR_ERR(pages);
1131
1132         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1133                           object_name, ofs, inbound_size, NULL,
1134                           pages, num_pages,
1135                           flags,
1136                           ops,
1137                           NULL, 0,
1138                           NULL,
1139                           linger_req, ver);
1140         if (ret < 0)
1141                 goto done;
1142
1143         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1144                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1145
1146 done:
1147         ceph_release_page_vector(pages, num_pages);
1148         return ret;
1149 }
1150
1151 /*
1152  * Do an asynchronous ceph osd operation
1153  */
1154 static int rbd_do_op(struct request *rq,
1155                      struct rbd_device *rbd_dev,
1156                      struct ceph_snap_context *snapc,
1157                      u64 snapid,
1158                      int opcode, int flags,
1159                      u64 ofs, u64 len,
1160                      struct bio *bio,
1161                      struct rbd_req_coll *coll,
1162                      int coll_index)
1163 {
1164         char *seg_name;
1165         u64 seg_ofs;
1166         u64 seg_len;
1167         int ret;
1168         struct ceph_osd_req_op *ops;
1169         u32 payload_len;
1170
1171         seg_name = rbd_segment_name(rbd_dev, ofs);
1172         if (!seg_name)
1173                 return -ENOMEM;
1174         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1175         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1176
1177         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1178
1179         ret = -ENOMEM;
1180         ops = rbd_create_rw_ops(1, opcode, payload_len);
1181         if (!ops)
1182                 goto done;
1183
1184         /* we've taken care of segment sizes earlier when we
1185            cloned the bios. We should never have a segment
1186            truncated at this point */
1187         rbd_assert(seg_len == len);
1188
1189         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1190                              seg_name, seg_ofs, seg_len,
1191                              bio,
1192                              NULL, 0,
1193                              flags,
1194                              ops,
1195                              coll, coll_index,
1196                              rbd_req_cb, 0, NULL);
1197
1198         rbd_destroy_ops(ops);
1199 done:
1200         kfree(seg_name);
1201         return ret;
1202 }
1203
1204 /*
1205  * Request async osd write
1206  */
1207 static int rbd_req_write(struct request *rq,
1208                          struct rbd_device *rbd_dev,
1209                          struct ceph_snap_context *snapc,
1210                          u64 ofs, u64 len,
1211                          struct bio *bio,
1212                          struct rbd_req_coll *coll,
1213                          int coll_index)
1214 {
1215         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1216                          CEPH_OSD_OP_WRITE,
1217                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1218                          ofs, len, bio, coll, coll_index);
1219 }
1220
1221 /*
1222  * Request async osd read
1223  */
1224 static int rbd_req_read(struct request *rq,
1225                          struct rbd_device *rbd_dev,
1226                          u64 snapid,
1227                          u64 ofs, u64 len,
1228                          struct bio *bio,
1229                          struct rbd_req_coll *coll,
1230                          int coll_index)
1231 {
1232         return rbd_do_op(rq, rbd_dev, NULL,
1233                          snapid,
1234                          CEPH_OSD_OP_READ,
1235                          CEPH_OSD_FLAG_READ,
1236                          ofs, len, bio, coll, coll_index);
1237 }
1238
1239 /*
1240  * Request sync osd read
1241  */
1242 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1243                           u64 snapid,
1244                           const char *object_name,
1245                           u64 ofs, u64 len,
1246                           char *buf,
1247                           u64 *ver)
1248 {
1249         struct ceph_osd_req_op *ops;
1250         int ret;
1251
1252         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1253         if (!ops)
1254                 return -ENOMEM;
1255
1256         ret = rbd_req_sync_op(rbd_dev, NULL,
1257                                snapid,
1258                                CEPH_OSD_FLAG_READ,
1259                                ops, object_name, ofs, len, buf, NULL, ver);
1260         rbd_destroy_ops(ops);
1261
1262         return ret;
1263 }
1264
1265 /*
1266  * Request sync osd watch
1267  */
1268 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1269                                    u64 ver,
1270                                    u64 notify_id)
1271 {
1272         struct ceph_osd_req_op *ops;
1273         int ret;
1274
1275         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1276         if (!ops)
1277                 return -ENOMEM;
1278
1279         ops[0].watch.ver = cpu_to_le64(ver);
1280         ops[0].watch.cookie = notify_id;
1281         ops[0].watch.flag = 0;
1282
1283         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1284                           rbd_dev->header_name, 0, 0, NULL,
1285                           NULL, 0,
1286                           CEPH_OSD_FLAG_READ,
1287                           ops,
1288                           NULL, 0,
1289                           rbd_simple_req_cb, 0, NULL);
1290
1291         rbd_destroy_ops(ops);
1292         return ret;
1293 }
1294
1295 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1296 {
1297         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1298         u64 hver;
1299         int rc;
1300
1301         if (!rbd_dev)
1302                 return;
1303
1304         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1305                 rbd_dev->header_name, (unsigned long long) notify_id,
1306                 (unsigned int) opcode);
1307         rc = rbd_refresh_header(rbd_dev, &hver);
1308         if (rc)
1309                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1310                            " update snaps: %d\n", rbd_dev->major, rc);
1311
1312         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1313 }
1314
1315 /*
1316  * Request sync osd watch
1317  */
1318 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1319 {
1320         struct ceph_osd_req_op *ops;
1321         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1322         int ret;
1323
1324         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1325         if (!ops)
1326                 return -ENOMEM;
1327
1328         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1329                                      (void *)rbd_dev, &rbd_dev->watch_event);
1330         if (ret < 0)
1331                 goto fail;
1332
1333         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1334         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1335         ops[0].watch.flag = 1;
1336
1337         ret = rbd_req_sync_op(rbd_dev, NULL,
1338                               CEPH_NOSNAP,
1339                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1340                               ops,
1341                               rbd_dev->header_name,
1342                               0, 0, NULL,
1343                               &rbd_dev->watch_request, NULL);
1344
1345         if (ret < 0)
1346                 goto fail_event;
1347
1348         rbd_destroy_ops(ops);
1349         return 0;
1350
1351 fail_event:
1352         ceph_osdc_cancel_event(rbd_dev->watch_event);
1353         rbd_dev->watch_event = NULL;
1354 fail:
1355         rbd_destroy_ops(ops);
1356         return ret;
1357 }
1358
1359 /*
1360  * Request sync osd unwatch
1361  */
1362 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1363 {
1364         struct ceph_osd_req_op *ops;
1365         int ret;
1366
1367         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1368         if (!ops)
1369                 return -ENOMEM;
1370
1371         ops[0].watch.ver = 0;
1372         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1373         ops[0].watch.flag = 0;
1374
1375         ret = rbd_req_sync_op(rbd_dev, NULL,
1376                               CEPH_NOSNAP,
1377                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1378                               ops,
1379                               rbd_dev->header_name,
1380                               0, 0, NULL, NULL, NULL);
1381
1382
1383         rbd_destroy_ops(ops);
1384         ceph_osdc_cancel_event(rbd_dev->watch_event);
1385         rbd_dev->watch_event = NULL;
1386         return ret;
1387 }
1388
1389 /*
1390  * Synchronous osd object method call
1391  */
1392 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1393                              const char *object_name,
1394                              const char *class_name,
1395                              const char *method_name,
1396                              const char *outbound,
1397                              size_t outbound_size,
1398                              char *inbound,
1399                              size_t inbound_size,
1400                              int flags,
1401                              u64 *ver)
1402 {
1403         struct ceph_osd_req_op *ops;
1404         int class_name_len = strlen(class_name);
1405         int method_name_len = strlen(method_name);
1406         int payload_size;
1407         int ret;
1408
1409         /*
1410          * Any input parameters required by the method we're calling
1411          * will be sent along with the class and method names as
1412          * part of the message payload.  That data and its size are
1413          * supplied via the indata and indata_len fields (named from
1414          * the perspective of the server side) in the OSD request
1415          * operation.
1416          */
1417         payload_size = class_name_len + method_name_len + outbound_size;
1418         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1419         if (!ops)
1420                 return -ENOMEM;
1421
1422         ops[0].cls.class_name = class_name;
1423         ops[0].cls.class_len = (__u8) class_name_len;
1424         ops[0].cls.method_name = method_name;
1425         ops[0].cls.method_len = (__u8) method_name_len;
1426         ops[0].cls.argc = 0;
1427         ops[0].cls.indata = outbound;
1428         ops[0].cls.indata_len = outbound_size;
1429
1430         ret = rbd_req_sync_op(rbd_dev, NULL,
1431                                CEPH_NOSNAP,
1432                                flags, ops,
1433                                object_name, 0, inbound_size, inbound,
1434                                NULL, ver);
1435
1436         rbd_destroy_ops(ops);
1437
1438         dout("cls_exec returned %d\n", ret);
1439         return ret;
1440 }
1441
1442 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1443 {
1444         struct rbd_req_coll *coll =
1445                         kzalloc(sizeof(struct rbd_req_coll) +
1446                                 sizeof(struct rbd_req_status) * num_reqs,
1447                                 GFP_ATOMIC);
1448
1449         if (!coll)
1450                 return NULL;
1451         coll->total = num_reqs;
1452         kref_init(&coll->kref);
1453         return coll;
1454 }
1455
1456 /*
1457  * block device queue callback
1458  */
1459 static void rbd_rq_fn(struct request_queue *q)
1460 {
1461         struct rbd_device *rbd_dev = q->queuedata;
1462         struct request *rq;
1463         struct bio_pair *bp = NULL;
1464
1465         while ((rq = blk_fetch_request(q))) {
1466                 struct bio *bio;
1467                 struct bio *rq_bio, *next_bio = NULL;
1468                 bool do_write;
1469                 unsigned int size;
1470                 u64 op_size = 0;
1471                 u64 ofs;
1472                 int num_segs, cur_seg = 0;
1473                 struct rbd_req_coll *coll;
1474                 struct ceph_snap_context *snapc;
1475
1476                 dout("fetched request\n");
1477
1478                 /* filter out block requests we don't understand */
1479                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1480                         __blk_end_request_all(rq, 0);
1481                         continue;
1482                 }
1483
1484                 /* deduce our operation (read, write) */
1485                 do_write = (rq_data_dir(rq) == WRITE);
1486
1487                 size = blk_rq_bytes(rq);
1488                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1489                 rq_bio = rq->bio;
1490                 if (do_write && rbd_dev->mapping.read_only) {
1491                         __blk_end_request_all(rq, -EROFS);
1492                         continue;
1493                 }
1494
1495                 spin_unlock_irq(q->queue_lock);
1496
1497                 down_read(&rbd_dev->header_rwsem);
1498
1499                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1500                                 !rbd_dev->mapping.snap_exists) {
1501                         up_read(&rbd_dev->header_rwsem);
1502                         dout("request for non-existent snapshot");
1503                         spin_lock_irq(q->queue_lock);
1504                         __blk_end_request_all(rq, -ENXIO);
1505                         continue;
1506                 }
1507
1508                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1509
1510                 up_read(&rbd_dev->header_rwsem);
1511
1512                 dout("%s 0x%x bytes at 0x%llx\n",
1513                      do_write ? "write" : "read",
1514                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1515
1516                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1517                 if (num_segs <= 0) {
1518                         spin_lock_irq(q->queue_lock);
1519                         __blk_end_request_all(rq, num_segs);
1520                         ceph_put_snap_context(snapc);
1521                         continue;
1522                 }
1523                 coll = rbd_alloc_coll(num_segs);
1524                 if (!coll) {
1525                         spin_lock_irq(q->queue_lock);
1526                         __blk_end_request_all(rq, -ENOMEM);
1527                         ceph_put_snap_context(snapc);
1528                         continue;
1529                 }
1530
1531                 do {
1532                         /* a bio clone to be passed down to OSD req */
1533                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1535                         kref_get(&coll->kref);
1536                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1537                                               op_size, GFP_ATOMIC);
1538                         if (!bio) {
1539                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1540                                                        -ENOMEM, op_size);
1541                                 goto next_seg;
1542                         }
1543
1544
1545                         /* init OSD command: write or read */
1546                         if (do_write)
1547                                 rbd_req_write(rq, rbd_dev,
1548                                               snapc,
1549                                               ofs,
1550                                               op_size, bio,
1551                                               coll, cur_seg);
1552                         else
1553                                 rbd_req_read(rq, rbd_dev,
1554                                              rbd_dev->mapping.snap_id,
1555                                              ofs,
1556                                              op_size, bio,
1557                                              coll, cur_seg);
1558
1559 next_seg:
1560                         size -= op_size;
1561                         ofs += op_size;
1562
1563                         cur_seg++;
1564                         rq_bio = next_bio;
1565                 } while (size > 0);
1566                 kref_put(&coll->kref, rbd_coll_release);
1567
1568                 if (bp)
1569                         bio_pair_release(bp);
1570                 spin_lock_irq(q->queue_lock);
1571
1572                 ceph_put_snap_context(snapc);
1573         }
1574 }
1575
1576 /*
1577  * a queue callback. Makes sure that we don't create a bio that spans across
1578  * multiple osd objects. One exception would be with a single page bios,
1579  * which we handle later at bio_chain_clone
1580  */
1581 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1582                           struct bio_vec *bvec)
1583 {
1584         struct rbd_device *rbd_dev = q->queuedata;
1585         unsigned int chunk_sectors;
1586         sector_t sector;
1587         unsigned int bio_sectors;
1588         int max;
1589
1590         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1591         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1592         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1593
1594         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1595                                  + bio_sectors)) << SECTOR_SHIFT;
1596         if (max < 0)
1597                 max = 0; /* bio_add cannot handle a negative return */
1598         if (max <= bvec->bv_len && bio_sectors == 0)
1599                 return bvec->bv_len;
1600         return max;
1601 }
1602
1603 static void rbd_free_disk(struct rbd_device *rbd_dev)
1604 {
1605         struct gendisk *disk = rbd_dev->disk;
1606
1607         if (!disk)
1608                 return;
1609
1610         if (disk->flags & GENHD_FL_UP)
1611                 del_gendisk(disk);
1612         if (disk->queue)
1613                 blk_cleanup_queue(disk->queue);
1614         put_disk(disk);
1615 }
1616
1617 /*
1618  * Read the complete header for the given rbd device.
1619  *
1620  * Returns a pointer to a dynamically-allocated buffer containing
1621  * the complete and validated header.  Caller can pass the address
1622  * of a variable that will be filled in with the version of the
1623  * header object at the time it was read.
1624  *
1625  * Returns a pointer-coded errno if a failure occurs.
1626  */
1627 static struct rbd_image_header_ondisk *
1628 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1629 {
1630         struct rbd_image_header_ondisk *ondisk = NULL;
1631         u32 snap_count = 0;
1632         u64 names_size = 0;
1633         u32 want_count;
1634         int ret;
1635
1636         /*
1637          * The complete header will include an array of its 64-bit
1638          * snapshot ids, followed by the names of those snapshots as
1639          * a contiguous block of NUL-terminated strings.  Note that
1640          * the number of snapshots could change by the time we read
1641          * it in, in which case we re-read it.
1642          */
1643         do {
1644                 size_t size;
1645
1646                 kfree(ondisk);
1647
1648                 size = sizeof (*ondisk);
1649                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1650                 size += names_size;
1651                 ondisk = kmalloc(size, GFP_KERNEL);
1652                 if (!ondisk)
1653                         return ERR_PTR(-ENOMEM);
1654
1655                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1656                                        rbd_dev->header_name,
1657                                        0, size,
1658                                        (char *) ondisk, version);
1659
1660                 if (ret < 0)
1661                         goto out_err;
1662                 if (WARN_ON((size_t) ret < size)) {
1663                         ret = -ENXIO;
1664                         pr_warning("short header read for image %s"
1665                                         " (want %zd got %d)\n",
1666                                 rbd_dev->image_name, size, ret);
1667                         goto out_err;
1668                 }
1669                 if (!rbd_dev_ondisk_valid(ondisk)) {
1670                         ret = -ENXIO;
1671                         pr_warning("invalid header for image %s\n",
1672                                 rbd_dev->image_name);
1673                         goto out_err;
1674                 }
1675
1676                 names_size = le64_to_cpu(ondisk->snap_names_len);
1677                 want_count = snap_count;
1678                 snap_count = le32_to_cpu(ondisk->snap_count);
1679         } while (snap_count != want_count);
1680
1681         return ondisk;
1682
1683 out_err:
1684         kfree(ondisk);
1685
1686         return ERR_PTR(ret);
1687 }
1688
1689 /*
1690  * reload the ondisk the header
1691  */
1692 static int rbd_read_header(struct rbd_device *rbd_dev,
1693                            struct rbd_image_header *header)
1694 {
1695         struct rbd_image_header_ondisk *ondisk;
1696         u64 ver = 0;
1697         int ret;
1698
1699         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1700         if (IS_ERR(ondisk))
1701                 return PTR_ERR(ondisk);
1702         ret = rbd_header_from_disk(header, ondisk);
1703         if (ret >= 0)
1704                 header->obj_version = ver;
1705         kfree(ondisk);
1706
1707         return ret;
1708 }
1709
1710 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1711 {
1712         struct rbd_snap *snap;
1713         struct rbd_snap *next;
1714
1715         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1716                 __rbd_remove_snap_dev(snap);
1717 }
1718
1719 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1720 {
1721         sector_t size;
1722
1723         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1724                 return;
1725
1726         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1727         dout("setting size to %llu sectors", (unsigned long long) size);
1728         rbd_dev->mapping.size = (u64) size;
1729         set_capacity(rbd_dev->disk, size);
1730 }
1731
1732 /*
1733  * only read the first part of the ondisk header, without the snaps info
1734  */
1735 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1736 {
1737         int ret;
1738         struct rbd_image_header h;
1739
1740         ret = rbd_read_header(rbd_dev, &h);
1741         if (ret < 0)
1742                 return ret;
1743
1744         down_write(&rbd_dev->header_rwsem);
1745
1746         /* Update image size, and check for resize of mapped image */
1747         rbd_dev->header.image_size = h.image_size;
1748         rbd_update_mapping_size(rbd_dev);
1749
1750         /* rbd_dev->header.object_prefix shouldn't change */
1751         kfree(rbd_dev->header.snap_sizes);
1752         kfree(rbd_dev->header.snap_names);
1753         /* osd requests may still refer to snapc */
1754         ceph_put_snap_context(rbd_dev->header.snapc);
1755
1756         if (hver)
1757                 *hver = h.obj_version;
1758         rbd_dev->header.obj_version = h.obj_version;
1759         rbd_dev->header.image_size = h.image_size;
1760         rbd_dev->header.snapc = h.snapc;
1761         rbd_dev->header.snap_names = h.snap_names;
1762         rbd_dev->header.snap_sizes = h.snap_sizes;
1763         /* Free the extra copy of the object prefix */
1764         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1765         kfree(h.object_prefix);
1766
1767         ret = rbd_dev_snaps_update(rbd_dev);
1768         if (!ret)
1769                 ret = rbd_dev_snaps_register(rbd_dev);
1770
1771         up_write(&rbd_dev->header_rwsem);
1772
1773         return ret;
1774 }
1775
1776 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1777 {
1778         int ret;
1779
1780         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1781         ret = __rbd_refresh_header(rbd_dev, hver);
1782         mutex_unlock(&ctl_mutex);
1783
1784         return ret;
1785 }
1786
1787 static int rbd_init_disk(struct rbd_device *rbd_dev)
1788 {
1789         struct gendisk *disk;
1790         struct request_queue *q;
1791         u64 segment_size;
1792
1793         /* create gendisk info */
1794         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1795         if (!disk)
1796                 return -ENOMEM;
1797
1798         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1799                  rbd_dev->dev_id);
1800         disk->major = rbd_dev->major;
1801         disk->first_minor = 0;
1802         disk->fops = &rbd_bd_ops;
1803         disk->private_data = rbd_dev;
1804
1805         /* init rq */
1806         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1807         if (!q)
1808                 goto out_disk;
1809
1810         /* We use the default size, but let's be explicit about it. */
1811         blk_queue_physical_block_size(q, SECTOR_SIZE);
1812
1813         /* set io sizes to object size */
1814         segment_size = rbd_obj_bytes(&rbd_dev->header);
1815         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1816         blk_queue_max_segment_size(q, segment_size);
1817         blk_queue_io_min(q, segment_size);
1818         blk_queue_io_opt(q, segment_size);
1819
1820         blk_queue_merge_bvec(q, rbd_merge_bvec);
1821         disk->queue = q;
1822
1823         q->queuedata = rbd_dev;
1824
1825         rbd_dev->disk = disk;
1826
1827         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1828
1829         return 0;
1830 out_disk:
1831         put_disk(disk);
1832
1833         return -ENOMEM;
1834 }
1835
1836 /*
1837   sysfs
1838 */
1839
1840 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1841 {
1842         return container_of(dev, struct rbd_device, dev);
1843 }
1844
1845 static ssize_t rbd_size_show(struct device *dev,
1846                              struct device_attribute *attr, char *buf)
1847 {
1848         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1849         sector_t size;
1850
1851         down_read(&rbd_dev->header_rwsem);
1852         size = get_capacity(rbd_dev->disk);
1853         up_read(&rbd_dev->header_rwsem);
1854
1855         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1856 }
1857
1858 /*
1859  * Note this shows the features for whatever's mapped, which is not
1860  * necessarily the base image.
1861  */
1862 static ssize_t rbd_features_show(struct device *dev,
1863                              struct device_attribute *attr, char *buf)
1864 {
1865         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1866
1867         return sprintf(buf, "0x%016llx\n",
1868                         (unsigned long long) rbd_dev->mapping.features);
1869 }
1870
1871 static ssize_t rbd_major_show(struct device *dev,
1872                               struct device_attribute *attr, char *buf)
1873 {
1874         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1875
1876         return sprintf(buf, "%d\n", rbd_dev->major);
1877 }
1878
1879 static ssize_t rbd_client_id_show(struct device *dev,
1880                                   struct device_attribute *attr, char *buf)
1881 {
1882         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1883
1884         return sprintf(buf, "client%lld\n",
1885                         ceph_client_id(rbd_dev->rbd_client->client));
1886 }
1887
1888 static ssize_t rbd_pool_show(struct device *dev,
1889                              struct device_attribute *attr, char *buf)
1890 {
1891         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1892
1893         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1894 }
1895
1896 static ssize_t rbd_pool_id_show(struct device *dev,
1897                              struct device_attribute *attr, char *buf)
1898 {
1899         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1900
1901         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1902 }
1903
1904 static ssize_t rbd_name_show(struct device *dev,
1905                              struct device_attribute *attr, char *buf)
1906 {
1907         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1908
1909         return sprintf(buf, "%s\n", rbd_dev->image_name);
1910 }
1911
1912 static ssize_t rbd_image_id_show(struct device *dev,
1913                              struct device_attribute *attr, char *buf)
1914 {
1915         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1916
1917         return sprintf(buf, "%s\n", rbd_dev->image_id);
1918 }
1919
1920 /*
1921  * Shows the name of the currently-mapped snapshot (or
1922  * RBD_SNAP_HEAD_NAME for the base image).
1923  */
1924 static ssize_t rbd_snap_show(struct device *dev,
1925                              struct device_attribute *attr,
1926                              char *buf)
1927 {
1928         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1929
1930         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1931 }
1932
1933 static ssize_t rbd_image_refresh(struct device *dev,
1934                                  struct device_attribute *attr,
1935                                  const char *buf,
1936                                  size_t size)
1937 {
1938         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1939         int ret;
1940
1941         ret = rbd_refresh_header(rbd_dev, NULL);
1942
1943         return ret < 0 ? ret : size;
1944 }
1945
1946 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1947 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1948 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1949 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1950 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1951 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1952 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1953 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1954 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1955 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1956
1957 static struct attribute *rbd_attrs[] = {
1958         &dev_attr_size.attr,
1959         &dev_attr_features.attr,
1960         &dev_attr_major.attr,
1961         &dev_attr_client_id.attr,
1962         &dev_attr_pool.attr,
1963         &dev_attr_pool_id.attr,
1964         &dev_attr_name.attr,
1965         &dev_attr_image_id.attr,
1966         &dev_attr_current_snap.attr,
1967         &dev_attr_refresh.attr,
1968         NULL
1969 };
1970
1971 static struct attribute_group rbd_attr_group = {
1972         .attrs = rbd_attrs,
1973 };
1974
1975 static const struct attribute_group *rbd_attr_groups[] = {
1976         &rbd_attr_group,
1977         NULL
1978 };
1979
1980 static void rbd_sysfs_dev_release(struct device *dev)
1981 {
1982 }
1983
1984 static struct device_type rbd_device_type = {
1985         .name           = "rbd",
1986         .groups         = rbd_attr_groups,
1987         .release        = rbd_sysfs_dev_release,
1988 };
1989
1990
1991 /*
1992   sysfs - snapshots
1993 */
1994
1995 static ssize_t rbd_snap_size_show(struct device *dev,
1996                                   struct device_attribute *attr,
1997                                   char *buf)
1998 {
1999         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2000
2001         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2002 }
2003
2004 static ssize_t rbd_snap_id_show(struct device *dev,
2005                                 struct device_attribute *attr,
2006                                 char *buf)
2007 {
2008         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2009
2010         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2011 }
2012
2013 static ssize_t rbd_snap_features_show(struct device *dev,
2014                                 struct device_attribute *attr,
2015                                 char *buf)
2016 {
2017         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2018
2019         return sprintf(buf, "0x%016llx\n",
2020                         (unsigned long long) snap->features);
2021 }
2022
2023 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2024 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2025 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2026
2027 static struct attribute *rbd_snap_attrs[] = {
2028         &dev_attr_snap_size.attr,
2029         &dev_attr_snap_id.attr,
2030         &dev_attr_snap_features.attr,
2031         NULL,
2032 };
2033
2034 static struct attribute_group rbd_snap_attr_group = {
2035         .attrs = rbd_snap_attrs,
2036 };
2037
2038 static void rbd_snap_dev_release(struct device *dev)
2039 {
2040         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2041         kfree(snap->name);
2042         kfree(snap);
2043 }
2044
2045 static const struct attribute_group *rbd_snap_attr_groups[] = {
2046         &rbd_snap_attr_group,
2047         NULL
2048 };
2049
2050 static struct device_type rbd_snap_device_type = {
2051         .groups         = rbd_snap_attr_groups,
2052         .release        = rbd_snap_dev_release,
2053 };
2054
2055 static bool rbd_snap_registered(struct rbd_snap *snap)
2056 {
2057         bool ret = snap->dev.type == &rbd_snap_device_type;
2058         bool reg = device_is_registered(&snap->dev);
2059
2060         rbd_assert(!ret ^ reg);
2061
2062         return ret;
2063 }
2064
2065 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2066 {
2067         list_del(&snap->node);
2068         if (device_is_registered(&snap->dev))
2069                 device_unregister(&snap->dev);
2070 }
2071
2072 static int rbd_register_snap_dev(struct rbd_snap *snap,
2073                                   struct device *parent)
2074 {
2075         struct device *dev = &snap->dev;
2076         int ret;
2077
2078         dev->type = &rbd_snap_device_type;
2079         dev->parent = parent;
2080         dev->release = rbd_snap_dev_release;
2081         dev_set_name(dev, "snap_%s", snap->name);
2082         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2083
2084         ret = device_register(dev);
2085
2086         return ret;
2087 }
2088
2089 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2090                                                 const char *snap_name,
2091                                                 u64 snap_id, u64 snap_size,
2092                                                 u64 snap_features)
2093 {
2094         struct rbd_snap *snap;
2095         int ret;
2096
2097         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2098         if (!snap)
2099                 return ERR_PTR(-ENOMEM);
2100
2101         ret = -ENOMEM;
2102         snap->name = kstrdup(snap_name, GFP_KERNEL);
2103         if (!snap->name)
2104                 goto err;
2105
2106         snap->id = snap_id;
2107         snap->size = snap_size;
2108         snap->features = snap_features;
2109
2110         return snap;
2111
2112 err:
2113         kfree(snap->name);
2114         kfree(snap);
2115
2116         return ERR_PTR(ret);
2117 }
2118
2119 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2120                 u64 *snap_size, u64 *snap_features)
2121 {
2122         char *snap_name;
2123
2124         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2125
2126         *snap_size = rbd_dev->header.snap_sizes[which];
2127         *snap_features = 0;     /* No features for v1 */
2128
2129         /* Skip over names until we find the one we are looking for */
2130
2131         snap_name = rbd_dev->header.snap_names;
2132         while (which--)
2133                 snap_name += strlen(snap_name) + 1;
2134
2135         return snap_name;
2136 }
2137
2138 /*
2139  * Get the size and object order for an image snapshot, or if
2140  * snap_id is CEPH_NOSNAP, gets this information for the base
2141  * image.
2142  */
2143 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2144                                 u8 *order, u64 *snap_size)
2145 {
2146         __le64 snapid = cpu_to_le64(snap_id);
2147         int ret;
2148         struct {
2149                 u8 order;
2150                 __le64 size;
2151         } __attribute__ ((packed)) size_buf = { 0 };
2152
2153         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2154                                 "rbd", "get_size",
2155                                 (char *) &snapid, sizeof (snapid),
2156                                 (char *) &size_buf, sizeof (size_buf),
2157                                 CEPH_OSD_FLAG_READ, NULL);
2158         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2159         if (ret < 0)
2160                 return ret;
2161
2162         *order = size_buf.order;
2163         *snap_size = le64_to_cpu(size_buf.size);
2164
2165         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2166                 (unsigned long long) snap_id, (unsigned int) *order,
2167                 (unsigned long long) *snap_size);
2168
2169         return 0;
2170 }
2171
2172 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2173 {
2174         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2175                                         &rbd_dev->header.obj_order,
2176                                         &rbd_dev->header.image_size);
2177 }
2178
2179 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2180 {
2181         void *reply_buf;
2182         int ret;
2183         void *p;
2184
2185         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2186         if (!reply_buf)
2187                 return -ENOMEM;
2188
2189         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2190                                 "rbd", "get_object_prefix",
2191                                 NULL, 0,
2192                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2193                                 CEPH_OSD_FLAG_READ, NULL);
2194         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2195         if (ret < 0)
2196                 goto out;
2197
2198         p = reply_buf;
2199         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2200                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2201                                                 NULL, GFP_NOIO);
2202
2203         if (IS_ERR(rbd_dev->header.object_prefix)) {
2204                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2205                 rbd_dev->header.object_prefix = NULL;
2206         } else {
2207                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2208         }
2209
2210 out:
2211         kfree(reply_buf);
2212
2213         return ret;
2214 }
2215
2216 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2217                 u64 *snap_features)
2218 {
2219         __le64 snapid = cpu_to_le64(snap_id);
2220         struct {
2221                 __le64 features;
2222                 __le64 incompat;
2223         } features_buf = { 0 };
2224         int ret;
2225
2226         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2227                                 "rbd", "get_features",
2228                                 (char *) &snapid, sizeof (snapid),
2229                                 (char *) &features_buf, sizeof (features_buf),
2230                                 CEPH_OSD_FLAG_READ, NULL);
2231         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2232         if (ret < 0)
2233                 return ret;
2234         *snap_features = le64_to_cpu(features_buf.features);
2235
2236         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2237                 (unsigned long long) snap_id,
2238                 (unsigned long long) *snap_features,
2239                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2240
2241         return 0;
2242 }
2243
2244 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2245 {
2246         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2247                                                 &rbd_dev->header.features);
2248 }
2249
2250 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2251 {
2252         size_t size;
2253         int ret;
2254         void *reply_buf;
2255         void *p;
2256         void *end;
2257         u64 seq;
2258         u32 snap_count;
2259         struct ceph_snap_context *snapc;
2260         u32 i;
2261
2262         /*
2263          * We'll need room for the seq value (maximum snapshot id),
2264          * snapshot count, and array of that many snapshot ids.
2265          * For now we have a fixed upper limit on the number we're
2266          * prepared to receive.
2267          */
2268         size = sizeof (__le64) + sizeof (__le32) +
2269                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2270         reply_buf = kzalloc(size, GFP_KERNEL);
2271         if (!reply_buf)
2272                 return -ENOMEM;
2273
2274         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2275                                 "rbd", "get_snapcontext",
2276                                 NULL, 0,
2277                                 reply_buf, size,
2278                                 CEPH_OSD_FLAG_READ, ver);
2279         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2280         if (ret < 0)
2281                 goto out;
2282
2283         ret = -ERANGE;
2284         p = reply_buf;
2285         end = (char *) reply_buf + size;
2286         ceph_decode_64_safe(&p, end, seq, out);
2287         ceph_decode_32_safe(&p, end, snap_count, out);
2288
2289         /*
2290          * Make sure the reported number of snapshot ids wouldn't go
2291          * beyond the end of our buffer.  But before checking that,
2292          * make sure the computed size of the snapshot context we
2293          * allocate is representable in a size_t.
2294          */
2295         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2296                                  / sizeof (u64)) {
2297                 ret = -EINVAL;
2298                 goto out;
2299         }
2300         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2301                 goto out;
2302
2303         size = sizeof (struct ceph_snap_context) +
2304                                 snap_count * sizeof (snapc->snaps[0]);
2305         snapc = kmalloc(size, GFP_KERNEL);
2306         if (!snapc) {
2307                 ret = -ENOMEM;
2308                 goto out;
2309         }
2310
2311         atomic_set(&snapc->nref, 1);
2312         snapc->seq = seq;
2313         snapc->num_snaps = snap_count;
2314         for (i = 0; i < snap_count; i++)
2315                 snapc->snaps[i] = ceph_decode_64(&p);
2316
2317         rbd_dev->header.snapc = snapc;
2318
2319         dout("  snap context seq = %llu, snap_count = %u\n",
2320                 (unsigned long long) seq, (unsigned int) snap_count);
2321
2322 out:
2323         kfree(reply_buf);
2324
2325         return 0;
2326 }
2327
2328 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2329 {
2330         size_t size;
2331         void *reply_buf;
2332         __le64 snap_id;
2333         int ret;
2334         void *p;
2335         void *end;
2336         size_t snap_name_len;
2337         char *snap_name;
2338
2339         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2340         reply_buf = kmalloc(size, GFP_KERNEL);
2341         if (!reply_buf)
2342                 return ERR_PTR(-ENOMEM);
2343
2344         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2345         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2346                                 "rbd", "get_snapshot_name",
2347                                 (char *) &snap_id, sizeof (snap_id),
2348                                 reply_buf, size,
2349                                 CEPH_OSD_FLAG_READ, NULL);
2350         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2351         if (ret < 0)
2352                 goto out;
2353
2354         p = reply_buf;
2355         end = (char *) reply_buf + size;
2356         snap_name_len = 0;
2357         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2358                                 GFP_KERNEL);
2359         if (IS_ERR(snap_name)) {
2360                 ret = PTR_ERR(snap_name);
2361                 goto out;
2362         } else {
2363                 dout("  snap_id 0x%016llx snap_name = %s\n",
2364                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2365         }
2366         kfree(reply_buf);
2367
2368         return snap_name;
2369 out:
2370         kfree(reply_buf);
2371
2372         return ERR_PTR(ret);
2373 }
2374
2375 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2376                 u64 *snap_size, u64 *snap_features)
2377 {
2378         __le64 snap_id;
2379         u8 order;
2380         int ret;
2381
2382         snap_id = rbd_dev->header.snapc->snaps[which];
2383         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2384         if (ret)
2385                 return ERR_PTR(ret);
2386         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2387         if (ret)
2388                 return ERR_PTR(ret);
2389
2390         return rbd_dev_v2_snap_name(rbd_dev, which);
2391 }
2392
2393 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2394                 u64 *snap_size, u64 *snap_features)
2395 {
2396         if (rbd_dev->image_format == 1)
2397                 return rbd_dev_v1_snap_info(rbd_dev, which,
2398                                         snap_size, snap_features);
2399         if (rbd_dev->image_format == 2)
2400                 return rbd_dev_v2_snap_info(rbd_dev, which,
2401                                         snap_size, snap_features);
2402         return ERR_PTR(-EINVAL);
2403 }
2404
2405 /*
2406  * Scan the rbd device's current snapshot list and compare it to the
2407  * newly-received snapshot context.  Remove any existing snapshots
2408  * not present in the new snapshot context.  Add a new snapshot for
2409  * any snaphots in the snapshot context not in the current list.
2410  * And verify there are no changes to snapshots we already know
2411  * about.
2412  *
2413  * Assumes the snapshots in the snapshot context are sorted by
2414  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2415  * are also maintained in that order.)
2416  */
2417 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2418 {
2419         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2420         const u32 snap_count = snapc->num_snaps;
2421         struct list_head *head = &rbd_dev->snaps;
2422         struct list_head *links = head->next;
2423         u32 index = 0;
2424
2425         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2426         while (index < snap_count || links != head) {
2427                 u64 snap_id;
2428                 struct rbd_snap *snap;
2429                 char *snap_name;
2430                 u64 snap_size = 0;
2431                 u64 snap_features = 0;
2432
2433                 snap_id = index < snap_count ? snapc->snaps[index]
2434                                              : CEPH_NOSNAP;
2435                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2436                                      : NULL;
2437                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2438
2439                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2440                         struct list_head *next = links->next;
2441
2442                         /* Existing snapshot not in the new snap context */
2443
2444                         if (rbd_dev->mapping.snap_id == snap->id)
2445                                 rbd_dev->mapping.snap_exists = false;
2446                         __rbd_remove_snap_dev(snap);
2447                         dout("%ssnap id %llu has been removed\n",
2448                                 rbd_dev->mapping.snap_id == snap->id ?
2449                                                                 "mapped " : "",
2450                                 (unsigned long long) snap->id);
2451
2452                         /* Done with this list entry; advance */
2453
2454                         links = next;
2455                         continue;
2456                 }
2457
2458                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2459                                         &snap_size, &snap_features);
2460                 if (IS_ERR(snap_name))
2461                         return PTR_ERR(snap_name);
2462
2463                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2464                         (unsigned long long) snap_id);
2465                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2466                         struct rbd_snap *new_snap;
2467
2468                         /* We haven't seen this snapshot before */
2469
2470                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2471                                         snap_id, snap_size, snap_features);
2472                         if (IS_ERR(new_snap)) {
2473                                 int err = PTR_ERR(new_snap);
2474
2475                                 dout("  failed to add dev, error %d\n", err);
2476
2477                                 return err;
2478                         }
2479
2480                         /* New goes before existing, or at end of list */
2481
2482                         dout("  added dev%s\n", snap ? "" : " at end\n");
2483                         if (snap)
2484                                 list_add_tail(&new_snap->node, &snap->node);
2485                         else
2486                                 list_add_tail(&new_snap->node, head);
2487                 } else {
2488                         /* Already have this one */
2489
2490                         dout("  already present\n");
2491
2492                         rbd_assert(snap->size == snap_size);
2493                         rbd_assert(!strcmp(snap->name, snap_name));
2494                         rbd_assert(snap->features == snap_features);
2495
2496                         /* Done with this list entry; advance */
2497
2498                         links = links->next;
2499                 }
2500
2501                 /* Advance to the next entry in the snapshot context */
2502
2503                 index++;
2504         }
2505         dout("%s: done\n", __func__);
2506
2507         return 0;
2508 }
2509
2510 /*
2511  * Scan the list of snapshots and register the devices for any that
2512  * have not already been registered.
2513  */
2514 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2515 {
2516         struct rbd_snap *snap;
2517         int ret = 0;
2518
2519         dout("%s called\n", __func__);
2520         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2521                 return -EIO;
2522
2523         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2524                 if (!rbd_snap_registered(snap)) {
2525                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2526                         if (ret < 0)
2527                                 break;
2528                 }
2529         }
2530         dout("%s: returning %d\n", __func__, ret);
2531
2532         return ret;
2533 }
2534
2535 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2536 {
2537         struct device *dev;
2538         int ret;
2539
2540         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2541
2542         dev = &rbd_dev->dev;
2543         dev->bus = &rbd_bus_type;
2544         dev->type = &rbd_device_type;
2545         dev->parent = &rbd_root_dev;
2546         dev->release = rbd_dev_release;
2547         dev_set_name(dev, "%d", rbd_dev->dev_id);
2548         ret = device_register(dev);
2549
2550         mutex_unlock(&ctl_mutex);
2551
2552         return ret;
2553 }
2554
2555 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2556 {
2557         device_unregister(&rbd_dev->dev);
2558 }
2559
2560 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2561 {
2562         int ret, rc;
2563
2564         do {
2565                 ret = rbd_req_sync_watch(rbd_dev);
2566                 if (ret == -ERANGE) {
2567                         rc = rbd_refresh_header(rbd_dev, NULL);
2568                         if (rc < 0)
2569                                 return rc;
2570                 }
2571         } while (ret == -ERANGE);
2572
2573         return ret;
2574 }
2575
2576 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2577
2578 /*
2579  * Get a unique rbd identifier for the given new rbd_dev, and add
2580  * the rbd_dev to the global list.  The minimum rbd id is 1.
2581  */
2582 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2583 {
2584         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2585
2586         spin_lock(&rbd_dev_list_lock);
2587         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2588         spin_unlock(&rbd_dev_list_lock);
2589         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2590                 (unsigned long long) rbd_dev->dev_id);
2591 }
2592
2593 /*
2594  * Remove an rbd_dev from the global list, and record that its
2595  * identifier is no longer in use.
2596  */
2597 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2598 {
2599         struct list_head *tmp;
2600         int rbd_id = rbd_dev->dev_id;
2601         int max_id;
2602
2603         rbd_assert(rbd_id > 0);
2604
2605         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2606                 (unsigned long long) rbd_dev->dev_id);
2607         spin_lock(&rbd_dev_list_lock);
2608         list_del_init(&rbd_dev->node);
2609
2610         /*
2611          * If the id being "put" is not the current maximum, there
2612          * is nothing special we need to do.
2613          */
2614         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2615                 spin_unlock(&rbd_dev_list_lock);
2616                 return;
2617         }
2618
2619         /*
2620          * We need to update the current maximum id.  Search the
2621          * list to find out what it is.  We're more likely to find
2622          * the maximum at the end, so search the list backward.
2623          */
2624         max_id = 0;
2625         list_for_each_prev(tmp, &rbd_dev_list) {
2626                 struct rbd_device *rbd_dev;
2627
2628                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2629                 if (rbd_id > max_id)
2630                         max_id = rbd_id;
2631         }
2632         spin_unlock(&rbd_dev_list_lock);
2633
2634         /*
2635          * The max id could have been updated by rbd_dev_id_get(), in
2636          * which case it now accurately reflects the new maximum.
2637          * Be careful not to overwrite the maximum value in that
2638          * case.
2639          */
2640         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2641         dout("  max dev id has been reset\n");
2642 }
2643
2644 /*
2645  * Skips over white space at *buf, and updates *buf to point to the
2646  * first found non-space character (if any). Returns the length of
2647  * the token (string of non-white space characters) found.  Note
2648  * that *buf must be terminated with '\0'.
2649  */
2650 static inline size_t next_token(const char **buf)
2651 {
2652         /*
2653         * These are the characters that produce nonzero for
2654         * isspace() in the "C" and "POSIX" locales.
2655         */
2656         const char *spaces = " \f\n\r\t\v";
2657
2658         *buf += strspn(*buf, spaces);   /* Find start of token */
2659
2660         return strcspn(*buf, spaces);   /* Return token length */
2661 }
2662
2663 /*
2664  * Finds the next token in *buf, and if the provided token buffer is
2665  * big enough, copies the found token into it.  The result, if
2666  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2667  * must be terminated with '\0' on entry.
2668  *
2669  * Returns the length of the token found (not including the '\0').
2670  * Return value will be 0 if no token is found, and it will be >=
2671  * token_size if the token would not fit.
2672  *
2673  * The *buf pointer will be updated to point beyond the end of the
2674  * found token.  Note that this occurs even if the token buffer is
2675  * too small to hold it.
2676  */
2677 static inline size_t copy_token(const char **buf,
2678                                 char *token,
2679                                 size_t token_size)
2680 {
2681         size_t len;
2682
2683         len = next_token(buf);
2684         if (len < token_size) {
2685                 memcpy(token, *buf, len);
2686                 *(token + len) = '\0';
2687         }
2688         *buf += len;
2689
2690         return len;
2691 }
2692
2693 /*
2694  * Finds the next token in *buf, dynamically allocates a buffer big
2695  * enough to hold a copy of it, and copies the token into the new
2696  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2697  * that a duplicate buffer is created even for a zero-length token.
2698  *
2699  * Returns a pointer to the newly-allocated duplicate, or a null
2700  * pointer if memory for the duplicate was not available.  If
2701  * the lenp argument is a non-null pointer, the length of the token
2702  * (not including the '\0') is returned in *lenp.
2703  *
2704  * If successful, the *buf pointer will be updated to point beyond
2705  * the end of the found token.
2706  *
2707  * Note: uses GFP_KERNEL for allocation.
2708  */
2709 static inline char *dup_token(const char **buf, size_t *lenp)
2710 {
2711         char *dup;
2712         size_t len;
2713
2714         len = next_token(buf);
2715         dup = kmalloc(len + 1, GFP_KERNEL);
2716         if (!dup)
2717                 return NULL;
2718
2719         memcpy(dup, *buf, len);
2720         *(dup + len) = '\0';
2721         *buf += len;
2722
2723         if (lenp)
2724                 *lenp = len;
2725
2726         return dup;
2727 }
2728
2729 /*
2730  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2731  * rbd_md_name, and name fields of the given rbd_dev, based on the
2732  * list of monitor addresses and other options provided via
2733  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2734  * copy of the snapshot name to map if successful, or a
2735  * pointer-coded error otherwise.
2736  *
2737  * Note: rbd_dev is assumed to have been initially zero-filled.
2738  */
2739 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2740                                 const char *buf,
2741                                 const char **mon_addrs,
2742                                 size_t *mon_addrs_size,
2743                                 char *options,
2744                                 size_t options_size)
2745 {
2746         size_t len;
2747         char *err_ptr = ERR_PTR(-EINVAL);
2748         char *snap_name;
2749
2750         /* The first four tokens are required */
2751
2752         len = next_token(&buf);
2753         if (!len)
2754                 return err_ptr;
2755         *mon_addrs_size = len + 1;
2756         *mon_addrs = buf;
2757
2758         buf += len;
2759
2760         len = copy_token(&buf, options, options_size);
2761         if (!len || len >= options_size)
2762                 return err_ptr;
2763
2764         err_ptr = ERR_PTR(-ENOMEM);
2765         rbd_dev->pool_name = dup_token(&buf, NULL);
2766         if (!rbd_dev->pool_name)
2767                 goto out_err;
2768
2769         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2770         if (!rbd_dev->image_name)
2771                 goto out_err;
2772
2773         /* Snapshot name is optional */
2774         len = next_token(&buf);
2775         if (!len) {
2776                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2777                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2778         }
2779         snap_name = kmalloc(len + 1, GFP_KERNEL);
2780         if (!snap_name)
2781                 goto out_err;
2782         memcpy(snap_name, buf, len);
2783         *(snap_name + len) = '\0';
2784
2785 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2786
2787         return snap_name;
2788
2789 out_err:
2790         kfree(rbd_dev->image_name);
2791         rbd_dev->image_name = NULL;
2792         rbd_dev->image_name_len = 0;
2793         kfree(rbd_dev->pool_name);
2794         rbd_dev->pool_name = NULL;
2795
2796         return err_ptr;
2797 }
2798
2799 /*
2800  * An rbd format 2 image has a unique identifier, distinct from the
2801  * name given to it by the user.  Internally, that identifier is
2802  * what's used to specify the names of objects related to the image.
2803  *
2804  * A special "rbd id" object is used to map an rbd image name to its
2805  * id.  If that object doesn't exist, then there is no v2 rbd image
2806  * with the supplied name.
2807  *
2808  * This function will record the given rbd_dev's image_id field if
2809  * it can be determined, and in that case will return 0.  If any
2810  * errors occur a negative errno will be returned and the rbd_dev's
2811  * image_id field will be unchanged (and should be NULL).
2812  */
2813 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2814 {
2815         int ret;
2816         size_t size;
2817         char *object_name;
2818         void *response;
2819         void *p;
2820
2821         /*
2822          * First, see if the format 2 image id file exists, and if
2823          * so, get the image's persistent id from it.
2824          */
2825         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2826         object_name = kmalloc(size, GFP_NOIO);
2827         if (!object_name)
2828                 return -ENOMEM;
2829         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2830         dout("rbd id object name is %s\n", object_name);
2831
2832         /* Response will be an encoded string, which includes a length */
2833
2834         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2835         response = kzalloc(size, GFP_NOIO);
2836         if (!response) {
2837                 ret = -ENOMEM;
2838                 goto out;
2839         }
2840
2841         ret = rbd_req_sync_exec(rbd_dev, object_name,
2842                                 "rbd", "get_id",
2843                                 NULL, 0,
2844                                 response, RBD_IMAGE_ID_LEN_MAX,
2845                                 CEPH_OSD_FLAG_READ, NULL);
2846         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2847         if (ret < 0)
2848                 goto out;
2849
2850         p = response;
2851         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2852                                                 p + RBD_IMAGE_ID_LEN_MAX,
2853                                                 &rbd_dev->image_id_len,
2854                                                 GFP_NOIO);
2855         if (IS_ERR(rbd_dev->image_id)) {
2856                 ret = PTR_ERR(rbd_dev->image_id);
2857                 rbd_dev->image_id = NULL;
2858         } else {
2859                 dout("image_id is %s\n", rbd_dev->image_id);
2860         }
2861 out:
2862         kfree(response);
2863         kfree(object_name);
2864
2865         return ret;
2866 }
2867
2868 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2869 {
2870         int ret;
2871         size_t size;
2872
2873         /* Version 1 images have no id; empty string is used */
2874
2875         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2876         if (!rbd_dev->image_id)
2877                 return -ENOMEM;
2878         rbd_dev->image_id_len = 0;
2879
2880         /* Record the header object name for this rbd image. */
2881
2882         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2883         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2884         if (!rbd_dev->header_name) {
2885                 ret = -ENOMEM;
2886                 goto out_err;
2887         }
2888         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2889
2890         /* Populate rbd image metadata */
2891
2892         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2893         if (ret < 0)
2894                 goto out_err;
2895         rbd_dev->image_format = 1;
2896
2897         dout("discovered version 1 image, header name is %s\n",
2898                 rbd_dev->header_name);
2899
2900         return 0;
2901
2902 out_err:
2903         kfree(rbd_dev->header_name);
2904         rbd_dev->header_name = NULL;
2905         kfree(rbd_dev->image_id);
2906         rbd_dev->image_id = NULL;
2907
2908         return ret;
2909 }
2910
2911 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2912 {
2913         size_t size;
2914         int ret;
2915         u64 ver = 0;
2916
2917         /*
2918          * Image id was filled in by the caller.  Record the header
2919          * object name for this rbd image.
2920          */
2921         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2922         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2923         if (!rbd_dev->header_name)
2924                 return -ENOMEM;
2925         sprintf(rbd_dev->header_name, "%s%s",
2926                         RBD_HEADER_PREFIX, rbd_dev->image_id);
2927
2928         /* Get the size and object order for the image */
2929
2930         ret = rbd_dev_v2_image_size(rbd_dev);
2931         if (ret < 0)
2932                 goto out_err;
2933
2934         /* Get the object prefix (a.k.a. block_name) for the image */
2935
2936         ret = rbd_dev_v2_object_prefix(rbd_dev);
2937         if (ret < 0)
2938                 goto out_err;
2939
2940         /* Get the features for the image */
2941
2942         ret = rbd_dev_v2_features(rbd_dev);
2943         if (ret < 0)
2944                 goto out_err;
2945
2946         /* crypto and compression type aren't (yet) supported for v2 images */
2947
2948         rbd_dev->header.crypt_type = 0;
2949         rbd_dev->header.comp_type = 0;
2950
2951         /* Get the snapshot context, plus the header version */
2952
2953         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2954         if (ret)
2955                 goto out_err;
2956         rbd_dev->header.obj_version = ver;
2957
2958         rbd_dev->image_format = 2;
2959
2960         dout("discovered version 2 image, header name is %s\n",
2961                 rbd_dev->header_name);
2962
2963         return -ENOTSUPP;
2964 out_err:
2965         kfree(rbd_dev->header_name);
2966         rbd_dev->header_name = NULL;
2967         kfree(rbd_dev->header.object_prefix);
2968         rbd_dev->header.object_prefix = NULL;
2969
2970         return ret;
2971 }
2972
2973 /*
2974  * Probe for the existence of the header object for the given rbd
2975  * device.  For format 2 images this includes determining the image
2976  * id.
2977  */
2978 static int rbd_dev_probe(struct rbd_device *rbd_dev)
2979 {
2980         int ret;
2981
2982         /*
2983          * Get the id from the image id object.  If it's not a
2984          * format 2 image, we'll get ENOENT back, and we'll assume
2985          * it's a format 1 image.
2986          */
2987         ret = rbd_dev_image_id(rbd_dev);
2988         if (ret)
2989                 ret = rbd_dev_v1_probe(rbd_dev);
2990         else
2991                 ret = rbd_dev_v2_probe(rbd_dev);
2992         if (ret)
2993                 dout("probe failed, returning %d\n", ret);
2994
2995         return ret;
2996 }
2997
2998 static ssize_t rbd_add(struct bus_type *bus,
2999                        const char *buf,
3000                        size_t count)
3001 {
3002         char *options;
3003         struct rbd_device *rbd_dev = NULL;
3004         const char *mon_addrs = NULL;
3005         size_t mon_addrs_size = 0;
3006         struct ceph_osd_client *osdc;
3007         int rc = -ENOMEM;
3008         char *snap_name;
3009
3010         if (!try_module_get(THIS_MODULE))
3011                 return -ENODEV;
3012
3013         options = kmalloc(count, GFP_KERNEL);
3014         if (!options)
3015                 goto err_out_mem;
3016         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3017         if (!rbd_dev)
3018                 goto err_out_mem;
3019
3020         /* static rbd_device initialization */
3021         spin_lock_init(&rbd_dev->lock);
3022         INIT_LIST_HEAD(&rbd_dev->node);
3023         INIT_LIST_HEAD(&rbd_dev->snaps);
3024         init_rwsem(&rbd_dev->header_rwsem);
3025
3026         /* parse add command */
3027         snap_name = rbd_add_parse_args(rbd_dev, buf,
3028                                 &mon_addrs, &mon_addrs_size, options, count);
3029         if (IS_ERR(snap_name)) {
3030                 rc = PTR_ERR(snap_name);
3031                 goto err_out_mem;
3032         }
3033
3034         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3035         if (rc < 0)
3036                 goto err_out_args;
3037
3038         /* pick the pool */
3039         osdc = &rbd_dev->rbd_client->client->osdc;
3040         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3041         if (rc < 0)
3042                 goto err_out_client;
3043         rbd_dev->pool_id = rc;
3044
3045         rc = rbd_dev_probe(rbd_dev);
3046         if (rc < 0)
3047                 goto err_out_client;
3048         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3049
3050         /* no need to lock here, as rbd_dev is not registered yet */
3051         rc = rbd_dev_snaps_update(rbd_dev);
3052         if (rc)
3053                 goto err_out_header;
3054
3055         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3056         if (rc)
3057                 goto err_out_header;
3058
3059         /* generate unique id: find highest unique id, add one */
3060         rbd_dev_id_get(rbd_dev);
3061
3062         /* Fill in the device name, now that we have its id. */
3063         BUILD_BUG_ON(DEV_NAME_LEN
3064                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3065         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3066
3067         /* Get our block major device number. */
3068
3069         rc = register_blkdev(0, rbd_dev->name);
3070         if (rc < 0)
3071                 goto err_out_id;
3072         rbd_dev->major = rc;
3073
3074         /* Set up the blkdev mapping. */
3075
3076         rc = rbd_init_disk(rbd_dev);
3077         if (rc)
3078                 goto err_out_blkdev;
3079
3080         rc = rbd_bus_add_dev(rbd_dev);
3081         if (rc)
3082                 goto err_out_disk;
3083
3084         /*
3085          * At this point cleanup in the event of an error is the job
3086          * of the sysfs code (initiated by rbd_bus_del_dev()).
3087          */
3088
3089         down_write(&rbd_dev->header_rwsem);
3090         rc = rbd_dev_snaps_register(rbd_dev);
3091         up_write(&rbd_dev->header_rwsem);
3092         if (rc)
3093                 goto err_out_bus;
3094
3095         rc = rbd_init_watch_dev(rbd_dev);
3096         if (rc)
3097                 goto err_out_bus;
3098
3099         /* Everything's ready.  Announce the disk to the world. */
3100
3101         add_disk(rbd_dev->disk);
3102
3103         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3104                 (unsigned long long) rbd_dev->mapping.size);
3105
3106         return count;
3107
3108 err_out_bus:
3109         /* this will also clean up rest of rbd_dev stuff */
3110
3111         rbd_bus_del_dev(rbd_dev);
3112         kfree(options);
3113         return rc;
3114
3115 err_out_disk:
3116         rbd_free_disk(rbd_dev);
3117 err_out_blkdev:
3118         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3119 err_out_id:
3120         rbd_dev_id_put(rbd_dev);
3121 err_out_header:
3122         rbd_header_free(&rbd_dev->header);
3123 err_out_client:
3124         kfree(rbd_dev->header_name);
3125         rbd_put_client(rbd_dev);
3126         kfree(rbd_dev->image_id);
3127 err_out_args:
3128         kfree(rbd_dev->mapping.snap_name);
3129         kfree(rbd_dev->image_name);
3130         kfree(rbd_dev->pool_name);
3131 err_out_mem:
3132         kfree(rbd_dev);
3133         kfree(options);
3134
3135         dout("Error adding device %s\n", buf);
3136         module_put(THIS_MODULE);
3137
3138         return (ssize_t) rc;
3139 }
3140
3141 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3142 {
3143         struct list_head *tmp;
3144         struct rbd_device *rbd_dev;
3145
3146         spin_lock(&rbd_dev_list_lock);
3147         list_for_each(tmp, &rbd_dev_list) {
3148                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3149                 if (rbd_dev->dev_id == dev_id) {
3150                         spin_unlock(&rbd_dev_list_lock);
3151                         return rbd_dev;
3152                 }
3153         }
3154         spin_unlock(&rbd_dev_list_lock);
3155         return NULL;
3156 }
3157
3158 static void rbd_dev_release(struct device *dev)
3159 {
3160         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3161
3162         if (rbd_dev->watch_request) {
3163                 struct ceph_client *client = rbd_dev->rbd_client->client;
3164
3165                 ceph_osdc_unregister_linger_request(&client->osdc,
3166                                                     rbd_dev->watch_request);
3167         }
3168         if (rbd_dev->watch_event)
3169                 rbd_req_sync_unwatch(rbd_dev);
3170
3171         rbd_put_client(rbd_dev);
3172
3173         /* clean up and free blkdev */
3174         rbd_free_disk(rbd_dev);
3175         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3176
3177         /* release allocated disk header fields */
3178         rbd_header_free(&rbd_dev->header);
3179
3180         /* done with the id, and with the rbd_dev */
3181         kfree(rbd_dev->mapping.snap_name);
3182         kfree(rbd_dev->image_id);
3183         kfree(rbd_dev->header_name);
3184         kfree(rbd_dev->pool_name);
3185         kfree(rbd_dev->image_name);
3186         rbd_dev_id_put(rbd_dev);
3187         kfree(rbd_dev);
3188
3189         /* release module ref */
3190         module_put(THIS_MODULE);
3191 }
3192
3193 static ssize_t rbd_remove(struct bus_type *bus,
3194                           const char *buf,
3195                           size_t count)
3196 {
3197         struct rbd_device *rbd_dev = NULL;
3198         int target_id, rc;
3199         unsigned long ul;
3200         int ret = count;
3201
3202         rc = strict_strtoul(buf, 10, &ul);
3203         if (rc)
3204                 return rc;
3205
3206         /* convert to int; abort if we lost anything in the conversion */
3207         target_id = (int) ul;
3208         if (target_id != ul)
3209                 return -EINVAL;
3210
3211         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3212
3213         rbd_dev = __rbd_get_dev(target_id);
3214         if (!rbd_dev) {
3215                 ret = -ENOENT;
3216                 goto done;
3217         }
3218
3219         __rbd_remove_all_snaps(rbd_dev);
3220         rbd_bus_del_dev(rbd_dev);
3221
3222 done:
3223         mutex_unlock(&ctl_mutex);
3224
3225         return ret;
3226 }
3227
3228 /*
3229  * create control files in sysfs
3230  * /sys/bus/rbd/...
3231  */
3232 static int rbd_sysfs_init(void)
3233 {
3234         int ret;
3235
3236         ret = device_register(&rbd_root_dev);
3237         if (ret < 0)
3238                 return ret;
3239
3240         ret = bus_register(&rbd_bus_type);
3241         if (ret < 0)
3242                 device_unregister(&rbd_root_dev);
3243
3244         return ret;
3245 }
3246
3247 static void rbd_sysfs_cleanup(void)
3248 {
3249         bus_unregister(&rbd_bus_type);
3250         device_unregister(&rbd_root_dev);
3251 }
3252
3253 int __init rbd_init(void)
3254 {
3255         int rc;
3256
3257         rc = rbd_sysfs_init();
3258         if (rc)
3259                 return rc;
3260         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3261         return 0;
3262 }
3263
3264 void __exit rbd_exit(void)
3265 {
3266         rbd_sysfs_cleanup();
3267 }
3268
3269 module_init(rbd_init);
3270 module_exit(rbd_exit);
3271
3272 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3273 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3274 MODULE_DESCRIPTION("rados block device");
3275
3276 /* following authorship retained from original osdblk.c */
3277 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3278
3279 MODULE_LICENSE("GPL");