]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: increase maximum snapshot name length
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 struct rbd_options {
116         bool    read_only;
117 };
118
119 /*
120  * an instance of the client.  multiple devices may share an rbd client.
121  */
122 struct rbd_client {
123         struct ceph_client      *client;
124         struct kref             kref;
125         struct list_head        node;
126 };
127
128 /*
129  * a request completion status
130  */
131 struct rbd_req_status {
132         int done;
133         int rc;
134         u64 bytes;
135 };
136
137 /*
138  * a collection of requests
139  */
140 struct rbd_req_coll {
141         int                     total;
142         int                     num_done;
143         struct kref             kref;
144         struct rbd_req_status   status[0];
145 };
146
147 /*
148  * a single io request
149  */
150 struct rbd_request {
151         struct request          *rq;            /* blk layer request */
152         struct bio              *bio;           /* cloned bio */
153         struct page             **pages;        /* list of used pages */
154         u64                     len;
155         int                     coll_index;
156         struct rbd_req_coll     *coll;
157 };
158
159 struct rbd_snap {
160         struct  device          dev;
161         const char              *name;
162         u64                     size;
163         struct list_head        node;
164         u64                     id;
165         u64                     features;
166 };
167
168 struct rbd_mapping {
169         char                    *snap_name;
170         u64                     snap_id;
171         u64                     size;
172         u64                     features;
173         bool                    snap_exists;
174         bool                    read_only;
175 };
176
177 /*
178  * a single device
179  */
180 struct rbd_device {
181         int                     dev_id;         /* blkdev unique id */
182
183         int                     major;          /* blkdev assigned major */
184         struct gendisk          *disk;          /* blkdev's gendisk and rq */
185
186         u32                     image_format;   /* Either 1 or 2 */
187         struct rbd_options      rbd_opts;
188         struct rbd_client       *rbd_client;
189
190         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
191
192         spinlock_t              lock;           /* queue lock */
193
194         struct rbd_image_header header;
195         char                    *image_id;
196         size_t                  image_id_len;
197         char                    *image_name;
198         size_t                  image_name_len;
199         char                    *header_name;
200         char                    *pool_name;
201         int                     pool_id;
202
203         struct ceph_osd_event   *watch_event;
204         struct ceph_osd_request *watch_request;
205
206         /* protects updating the header */
207         struct rw_semaphore     header_rwsem;
208
209         struct rbd_mapping      mapping;
210
211         struct list_head        node;
212
213         /* list of snapshots */
214         struct list_head        snaps;
215
216         /* sysfs related */
217         struct device           dev;
218 };
219
220 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
221
222 static LIST_HEAD(rbd_dev_list);    /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
224
225 static LIST_HEAD(rbd_client_list);              /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
227
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
230
231 static void rbd_dev_release(struct device *dev);
232 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
233
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
235                        size_t count);
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
237                           size_t count);
238
239 static struct bus_attribute rbd_bus_attrs[] = {
240         __ATTR(add, S_IWUSR, NULL, rbd_add),
241         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
242         __ATTR_NULL
243 };
244
245 static struct bus_type rbd_bus_type = {
246         .name           = "rbd",
247         .bus_attrs      = rbd_bus_attrs,
248 };
249
250 static void rbd_root_dev_release(struct device *dev)
251 {
252 }
253
254 static struct device rbd_root_dev = {
255         .init_name =    "rbd",
256         .release =      rbd_root_dev_release,
257 };
258
259 #ifdef RBD_DEBUG
260 #define rbd_assert(expr)                                                \
261                 if (unlikely(!(expr))) {                                \
262                         printk(KERN_ERR "\nAssertion failure in %s() "  \
263                                                 "at line %d:\n\n"       \
264                                         "\trbd_assert(%s);\n\n",        \
265                                         __func__, __LINE__, #expr);     \
266                         BUG();                                          \
267                 }
268 #else /* !RBD_DEBUG */
269 #  define rbd_assert(expr)      ((void) 0)
270 #endif /* !RBD_DEBUG */
271
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
273 {
274         return get_device(&rbd_dev->dev);
275 }
276
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
278 {
279         put_device(&rbd_dev->dev);
280 }
281
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
284
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
286 {
287         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
288
289         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
290                 return -EROFS;
291
292         rbd_get_dev(rbd_dev);
293         set_device_ro(bdev, rbd_dev->mapping.read_only);
294
295         return 0;
296 }
297
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
299 {
300         struct rbd_device *rbd_dev = disk->private_data;
301
302         rbd_put_dev(rbd_dev);
303
304         return 0;
305 }
306
307 static const struct block_device_operations rbd_bd_ops = {
308         .owner                  = THIS_MODULE,
309         .open                   = rbd_open,
310         .release                = rbd_release,
311 };
312
313 /*
314  * Initialize an rbd client instance.
315  * We own *ceph_opts.
316  */
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
318 {
319         struct rbd_client *rbdc;
320         int ret = -ENOMEM;
321
322         dout("rbd_client_create\n");
323         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
324         if (!rbdc)
325                 goto out_opt;
326
327         kref_init(&rbdc->kref);
328         INIT_LIST_HEAD(&rbdc->node);
329
330         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
331
332         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333         if (IS_ERR(rbdc->client))
334                 goto out_mutex;
335         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
336
337         ret = ceph_open_session(rbdc->client);
338         if (ret < 0)
339                 goto out_err;
340
341         spin_lock(&rbd_client_list_lock);
342         list_add_tail(&rbdc->node, &rbd_client_list);
343         spin_unlock(&rbd_client_list_lock);
344
345         mutex_unlock(&ctl_mutex);
346
347         dout("rbd_client_create created %p\n", rbdc);
348         return rbdc;
349
350 out_err:
351         ceph_destroy_client(rbdc->client);
352 out_mutex:
353         mutex_unlock(&ctl_mutex);
354         kfree(rbdc);
355 out_opt:
356         if (ceph_opts)
357                 ceph_destroy_options(ceph_opts);
358         return ERR_PTR(ret);
359 }
360
361 /*
362  * Find a ceph client with specific addr and configuration.  If
363  * found, bump its reference count.
364  */
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
366 {
367         struct rbd_client *client_node;
368         bool found = false;
369
370         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
371                 return NULL;
372
373         spin_lock(&rbd_client_list_lock);
374         list_for_each_entry(client_node, &rbd_client_list, node) {
375                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376                         kref_get(&client_node->kref);
377                         found = true;
378                         break;
379                 }
380         }
381         spin_unlock(&rbd_client_list_lock);
382
383         return found ? client_node : NULL;
384 }
385
386 /*
387  * mount options
388  */
389 enum {
390         Opt_last_int,
391         /* int args above */
392         Opt_last_string,
393         /* string args above */
394         Opt_read_only,
395         Opt_read_write,
396         /* Boolean args above */
397         Opt_last_bool,
398 };
399
400 static match_table_t rbd_opts_tokens = {
401         /* int args above */
402         /* string args above */
403         {Opt_read_only, "read_only"},
404         {Opt_read_only, "ro"},          /* Alternate spelling */
405         {Opt_read_write, "read_write"},
406         {Opt_read_write, "rw"},         /* Alternate spelling */
407         /* Boolean args above */
408         {-1, NULL}
409 };
410
411 static int parse_rbd_opts_token(char *c, void *private)
412 {
413         struct rbd_options *rbd_opts = private;
414         substring_t argstr[MAX_OPT_ARGS];
415         int token, intval, ret;
416
417         token = match_token(c, rbd_opts_tokens, argstr);
418         if (token < 0)
419                 return -EINVAL;
420
421         if (token < Opt_last_int) {
422                 ret = match_int(&argstr[0], &intval);
423                 if (ret < 0) {
424                         pr_err("bad mount option arg (not int) "
425                                "at '%s'\n", c);
426                         return ret;
427                 }
428                 dout("got int token %d val %d\n", token, intval);
429         } else if (token > Opt_last_int && token < Opt_last_string) {
430                 dout("got string token %d val %s\n", token,
431                      argstr[0].from);
432         } else if (token > Opt_last_string && token < Opt_last_bool) {
433                 dout("got Boolean token %d\n", token);
434         } else {
435                 dout("got token %d\n", token);
436         }
437
438         switch (token) {
439         case Opt_read_only:
440                 rbd_opts->read_only = true;
441                 break;
442         case Opt_read_write:
443                 rbd_opts->read_only = false;
444                 break;
445         default:
446                 rbd_assert(false);
447                 break;
448         }
449         return 0;
450 }
451
452 /*
453  * Get a ceph client with specific addr and configuration, if one does
454  * not exist create it.
455  */
456 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
457                                 size_t mon_addr_len, char *options)
458 {
459         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
460         struct ceph_options *ceph_opts;
461         struct rbd_client *rbdc;
462
463         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
464
465         ceph_opts = ceph_parse_options(options, mon_addr,
466                                         mon_addr + mon_addr_len,
467                                         parse_rbd_opts_token, rbd_opts);
468         if (IS_ERR(ceph_opts))
469                 return PTR_ERR(ceph_opts);
470
471         rbdc = rbd_client_find(ceph_opts);
472         if (rbdc) {
473                 /* using an existing client */
474                 ceph_destroy_options(ceph_opts);
475         } else {
476                 rbdc = rbd_client_create(ceph_opts);
477                 if (IS_ERR(rbdc))
478                         return PTR_ERR(rbdc);
479         }
480         rbd_dev->rbd_client = rbdc;
481
482         return 0;
483 }
484
485 /*
486  * Destroy ceph client
487  *
488  * Caller must hold rbd_client_list_lock.
489  */
490 static void rbd_client_release(struct kref *kref)
491 {
492         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
493
494         dout("rbd_release_client %p\n", rbdc);
495         spin_lock(&rbd_client_list_lock);
496         list_del(&rbdc->node);
497         spin_unlock(&rbd_client_list_lock);
498
499         ceph_destroy_client(rbdc->client);
500         kfree(rbdc);
501 }
502
503 /*
504  * Drop reference to ceph client node. If it's not referenced anymore, release
505  * it.
506  */
507 static void rbd_put_client(struct rbd_device *rbd_dev)
508 {
509         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
510         rbd_dev->rbd_client = NULL;
511 }
512
513 /*
514  * Destroy requests collection
515  */
516 static void rbd_coll_release(struct kref *kref)
517 {
518         struct rbd_req_coll *coll =
519                 container_of(kref, struct rbd_req_coll, kref);
520
521         dout("rbd_coll_release %p\n", coll);
522         kfree(coll);
523 }
524
525 static bool rbd_image_format_valid(u32 image_format)
526 {
527         return image_format == 1 || image_format == 2;
528 }
529
530 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
531 {
532         size_t size;
533         u32 snap_count;
534
535         /* The header has to start with the magic rbd header text */
536         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
537                 return false;
538
539         /* The bio layer requires at least sector-sized I/O */
540
541         if (ondisk->options.order < SECTOR_SHIFT)
542                 return false;
543
544         /* If we use u64 in a few spots we may be able to loosen this */
545
546         if (ondisk->options.order > 8 * sizeof (int) - 1)
547                 return false;
548
549         /*
550          * The size of a snapshot header has to fit in a size_t, and
551          * that limits the number of snapshots.
552          */
553         snap_count = le32_to_cpu(ondisk->snap_count);
554         size = SIZE_MAX - sizeof (struct ceph_snap_context);
555         if (snap_count > size / sizeof (__le64))
556                 return false;
557
558         /*
559          * Not only that, but the size of the entire the snapshot
560          * header must also be representable in a size_t.
561          */
562         size -= snap_count * sizeof (__le64);
563         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
564                 return false;
565
566         return true;
567 }
568
569 /*
570  * Create a new header structure, translate header format from the on-disk
571  * header.
572  */
573 static int rbd_header_from_disk(struct rbd_image_header *header,
574                                  struct rbd_image_header_ondisk *ondisk)
575 {
576         u32 snap_count;
577         size_t len;
578         size_t size;
579         u32 i;
580
581         memset(header, 0, sizeof (*header));
582
583         snap_count = le32_to_cpu(ondisk->snap_count);
584
585         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
586         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
587         if (!header->object_prefix)
588                 return -ENOMEM;
589         memcpy(header->object_prefix, ondisk->object_prefix, len);
590         header->object_prefix[len] = '\0';
591
592         if (snap_count) {
593                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
594
595                 /* Save a copy of the snapshot names */
596
597                 if (snap_names_len > (u64) SIZE_MAX)
598                         return -EIO;
599                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
600                 if (!header->snap_names)
601                         goto out_err;
602                 /*
603                  * Note that rbd_dev_v1_header_read() guarantees
604                  * the ondisk buffer we're working with has
605                  * snap_names_len bytes beyond the end of the
606                  * snapshot id array, this memcpy() is safe.
607                  */
608                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
609                         snap_names_len);
610
611                 /* Record each snapshot's size */
612
613                 size = snap_count * sizeof (*header->snap_sizes);
614                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
615                 if (!header->snap_sizes)
616                         goto out_err;
617                 for (i = 0; i < snap_count; i++)
618                         header->snap_sizes[i] =
619                                 le64_to_cpu(ondisk->snaps[i].image_size);
620         } else {
621                 WARN_ON(ondisk->snap_names_len);
622                 header->snap_names = NULL;
623                 header->snap_sizes = NULL;
624         }
625
626         header->features = 0;   /* No features support in v1 images */
627         header->obj_order = ondisk->options.order;
628         header->crypt_type = ondisk->options.crypt_type;
629         header->comp_type = ondisk->options.comp_type;
630
631         /* Allocate and fill in the snapshot context */
632
633         header->image_size = le64_to_cpu(ondisk->image_size);
634         size = sizeof (struct ceph_snap_context);
635         size += snap_count * sizeof (header->snapc->snaps[0]);
636         header->snapc = kzalloc(size, GFP_KERNEL);
637         if (!header->snapc)
638                 goto out_err;
639
640         atomic_set(&header->snapc->nref, 1);
641         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
642         header->snapc->num_snaps = snap_count;
643         for (i = 0; i < snap_count; i++)
644                 header->snapc->snaps[i] =
645                         le64_to_cpu(ondisk->snaps[i].id);
646
647         return 0;
648
649 out_err:
650         kfree(header->snap_sizes);
651         header->snap_sizes = NULL;
652         kfree(header->snap_names);
653         header->snap_names = NULL;
654         kfree(header->object_prefix);
655         header->object_prefix = NULL;
656
657         return -ENOMEM;
658 }
659
660 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
661 {
662
663         struct rbd_snap *snap;
664
665         list_for_each_entry(snap, &rbd_dev->snaps, node) {
666                 if (!strcmp(snap_name, snap->name)) {
667                         rbd_dev->mapping.snap_id = snap->id;
668                         rbd_dev->mapping.size = snap->size;
669                         rbd_dev->mapping.features = snap->features;
670
671                         return 0;
672                 }
673         }
674
675         return -ENOENT;
676 }
677
678 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
679 {
680         int ret;
681
682         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
683                     sizeof (RBD_SNAP_HEAD_NAME))) {
684                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
685                 rbd_dev->mapping.size = rbd_dev->header.image_size;
686                 rbd_dev->mapping.features = rbd_dev->header.features;
687                 rbd_dev->mapping.snap_exists = false;
688                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
689                 ret = 0;
690         } else {
691                 ret = snap_by_name(rbd_dev, snap_name);
692                 if (ret < 0)
693                         goto done;
694                 rbd_dev->mapping.snap_exists = true;
695                 rbd_dev->mapping.read_only = true;
696         }
697         rbd_dev->mapping.snap_name = snap_name;
698 done:
699         return ret;
700 }
701
702 static void rbd_header_free(struct rbd_image_header *header)
703 {
704         kfree(header->object_prefix);
705         header->object_prefix = NULL;
706         kfree(header->snap_sizes);
707         header->snap_sizes = NULL;
708         kfree(header->snap_names);
709         header->snap_names = NULL;
710         ceph_put_snap_context(header->snapc);
711         header->snapc = NULL;
712 }
713
714 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
715 {
716         char *name;
717         u64 segment;
718         int ret;
719
720         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
721         if (!name)
722                 return NULL;
723         segment = offset >> rbd_dev->header.obj_order;
724         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
725                         rbd_dev->header.object_prefix, segment);
726         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
727                 pr_err("error formatting segment name for #%llu (%d)\n",
728                         segment, ret);
729                 kfree(name);
730                 name = NULL;
731         }
732
733         return name;
734 }
735
736 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
737 {
738         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
739
740         return offset & (segment_size - 1);
741 }
742
743 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
744                                 u64 offset, u64 length)
745 {
746         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
747
748         offset &= segment_size - 1;
749
750         rbd_assert(length <= U64_MAX - offset);
751         if (offset + length > segment_size)
752                 length = segment_size - offset;
753
754         return length;
755 }
756
757 static int rbd_get_num_segments(struct rbd_image_header *header,
758                                 u64 ofs, u64 len)
759 {
760         u64 start_seg;
761         u64 end_seg;
762
763         if (!len)
764                 return 0;
765         if (len - 1 > U64_MAX - ofs)
766                 return -ERANGE;
767
768         start_seg = ofs >> header->obj_order;
769         end_seg = (ofs + len - 1) >> header->obj_order;
770
771         return end_seg - start_seg + 1;
772 }
773
774 /*
775  * returns the size of an object in the image
776  */
777 static u64 rbd_obj_bytes(struct rbd_image_header *header)
778 {
779         return 1 << header->obj_order;
780 }
781
782 /*
783  * bio helpers
784  */
785
786 static void bio_chain_put(struct bio *chain)
787 {
788         struct bio *tmp;
789
790         while (chain) {
791                 tmp = chain;
792                 chain = chain->bi_next;
793                 bio_put(tmp);
794         }
795 }
796
797 /*
798  * zeros a bio chain, starting at specific offset
799  */
800 static void zero_bio_chain(struct bio *chain, int start_ofs)
801 {
802         struct bio_vec *bv;
803         unsigned long flags;
804         void *buf;
805         int i;
806         int pos = 0;
807
808         while (chain) {
809                 bio_for_each_segment(bv, chain, i) {
810                         if (pos + bv->bv_len > start_ofs) {
811                                 int remainder = max(start_ofs - pos, 0);
812                                 buf = bvec_kmap_irq(bv, &flags);
813                                 memset(buf + remainder, 0,
814                                        bv->bv_len - remainder);
815                                 bvec_kunmap_irq(buf, &flags);
816                         }
817                         pos += bv->bv_len;
818                 }
819
820                 chain = chain->bi_next;
821         }
822 }
823
824 /*
825  * bio_chain_clone - clone a chain of bios up to a certain length.
826  * might return a bio_pair that will need to be released.
827  */
828 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
829                                    struct bio_pair **bp,
830                                    int len, gfp_t gfpmask)
831 {
832         struct bio *old_chain = *old;
833         struct bio *new_chain = NULL;
834         struct bio *tail;
835         int total = 0;
836
837         if (*bp) {
838                 bio_pair_release(*bp);
839                 *bp = NULL;
840         }
841
842         while (old_chain && (total < len)) {
843                 struct bio *tmp;
844
845                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
846                 if (!tmp)
847                         goto err_out;
848                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
849
850                 if (total + old_chain->bi_size > len) {
851                         struct bio_pair *bp;
852
853                         /*
854                          * this split can only happen with a single paged bio,
855                          * split_bio will BUG_ON if this is not the case
856                          */
857                         dout("bio_chain_clone split! total=%d remaining=%d"
858                              "bi_size=%u\n",
859                              total, len - total, old_chain->bi_size);
860
861                         /* split the bio. We'll release it either in the next
862                            call, or it will have to be released outside */
863                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
864                         if (!bp)
865                                 goto err_out;
866
867                         __bio_clone(tmp, &bp->bio1);
868
869                         *next = &bp->bio2;
870                 } else {
871                         __bio_clone(tmp, old_chain);
872                         *next = old_chain->bi_next;
873                 }
874
875                 tmp->bi_bdev = NULL;
876                 tmp->bi_next = NULL;
877                 if (new_chain)
878                         tail->bi_next = tmp;
879                 else
880                         new_chain = tmp;
881                 tail = tmp;
882                 old_chain = old_chain->bi_next;
883
884                 total += tmp->bi_size;
885         }
886
887         rbd_assert(total == len);
888
889         *old = old_chain;
890
891         return new_chain;
892
893 err_out:
894         dout("bio_chain_clone with err\n");
895         bio_chain_put(new_chain);
896         return NULL;
897 }
898
899 /*
900  * helpers for osd request op vectors.
901  */
902 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
903                                         int opcode, u32 payload_len)
904 {
905         struct ceph_osd_req_op *ops;
906
907         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
908         if (!ops)
909                 return NULL;
910
911         ops[0].op = opcode;
912
913         /*
914          * op extent offset and length will be set later on
915          * in calc_raw_layout()
916          */
917         ops[0].payload_len = payload_len;
918
919         return ops;
920 }
921
922 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
923 {
924         kfree(ops);
925 }
926
927 static void rbd_coll_end_req_index(struct request *rq,
928                                    struct rbd_req_coll *coll,
929                                    int index,
930                                    int ret, u64 len)
931 {
932         struct request_queue *q;
933         int min, max, i;
934
935         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
936              coll, index, ret, (unsigned long long) len);
937
938         if (!rq)
939                 return;
940
941         if (!coll) {
942                 blk_end_request(rq, ret, len);
943                 return;
944         }
945
946         q = rq->q;
947
948         spin_lock_irq(q->queue_lock);
949         coll->status[index].done = 1;
950         coll->status[index].rc = ret;
951         coll->status[index].bytes = len;
952         max = min = coll->num_done;
953         while (max < coll->total && coll->status[max].done)
954                 max++;
955
956         for (i = min; i<max; i++) {
957                 __blk_end_request(rq, coll->status[i].rc,
958                                   coll->status[i].bytes);
959                 coll->num_done++;
960                 kref_put(&coll->kref, rbd_coll_release);
961         }
962         spin_unlock_irq(q->queue_lock);
963 }
964
965 static void rbd_coll_end_req(struct rbd_request *req,
966                              int ret, u64 len)
967 {
968         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
969 }
970
971 /*
972  * Send ceph osd request
973  */
974 static int rbd_do_request(struct request *rq,
975                           struct rbd_device *rbd_dev,
976                           struct ceph_snap_context *snapc,
977                           u64 snapid,
978                           const char *object_name, u64 ofs, u64 len,
979                           struct bio *bio,
980                           struct page **pages,
981                           int num_pages,
982                           int flags,
983                           struct ceph_osd_req_op *ops,
984                           struct rbd_req_coll *coll,
985                           int coll_index,
986                           void (*rbd_cb)(struct ceph_osd_request *req,
987                                          struct ceph_msg *msg),
988                           struct ceph_osd_request **linger_req,
989                           u64 *ver)
990 {
991         struct ceph_osd_request *req;
992         struct ceph_file_layout *layout;
993         int ret;
994         u64 bno;
995         struct timespec mtime = CURRENT_TIME;
996         struct rbd_request *req_data;
997         struct ceph_osd_request_head *reqhead;
998         struct ceph_osd_client *osdc;
999
1000         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1001         if (!req_data) {
1002                 if (coll)
1003                         rbd_coll_end_req_index(rq, coll, coll_index,
1004                                                -ENOMEM, len);
1005                 return -ENOMEM;
1006         }
1007
1008         if (coll) {
1009                 req_data->coll = coll;
1010                 req_data->coll_index = coll_index;
1011         }
1012
1013         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
1014                 (unsigned long long) ofs, (unsigned long long) len);
1015
1016         osdc = &rbd_dev->rbd_client->client->osdc;
1017         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1018                                         false, GFP_NOIO, pages, bio);
1019         if (!req) {
1020                 ret = -ENOMEM;
1021                 goto done_pages;
1022         }
1023
1024         req->r_callback = rbd_cb;
1025
1026         req_data->rq = rq;
1027         req_data->bio = bio;
1028         req_data->pages = pages;
1029         req_data->len = len;
1030
1031         req->r_priv = req_data;
1032
1033         reqhead = req->r_request->front.iov_base;
1034         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1035
1036         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1037         req->r_oid_len = strlen(req->r_oid);
1038
1039         layout = &req->r_file_layout;
1040         memset(layout, 0, sizeof(*layout));
1041         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1042         layout->fl_stripe_count = cpu_to_le32(1);
1043         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1044         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1045         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1046                                    req, ops);
1047         rbd_assert(ret == 0);
1048
1049         ceph_osdc_build_request(req, ofs, &len,
1050                                 ops,
1051                                 snapc,
1052                                 &mtime,
1053                                 req->r_oid, req->r_oid_len);
1054
1055         if (linger_req) {
1056                 ceph_osdc_set_request_linger(osdc, req);
1057                 *linger_req = req;
1058         }
1059
1060         ret = ceph_osdc_start_request(osdc, req, false);
1061         if (ret < 0)
1062                 goto done_err;
1063
1064         if (!rbd_cb) {
1065                 ret = ceph_osdc_wait_request(osdc, req);
1066                 if (ver)
1067                         *ver = le64_to_cpu(req->r_reassert_version.version);
1068                 dout("reassert_ver=%llu\n",
1069                         (unsigned long long)
1070                                 le64_to_cpu(req->r_reassert_version.version));
1071                 ceph_osdc_put_request(req);
1072         }
1073         return ret;
1074
1075 done_err:
1076         bio_chain_put(req_data->bio);
1077         ceph_osdc_put_request(req);
1078 done_pages:
1079         rbd_coll_end_req(req_data, ret, len);
1080         kfree(req_data);
1081         return ret;
1082 }
1083
1084 /*
1085  * Ceph osd op callback
1086  */
1087 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1088 {
1089         struct rbd_request *req_data = req->r_priv;
1090         struct ceph_osd_reply_head *replyhead;
1091         struct ceph_osd_op *op;
1092         __s32 rc;
1093         u64 bytes;
1094         int read_op;
1095
1096         /* parse reply */
1097         replyhead = msg->front.iov_base;
1098         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1099         op = (void *)(replyhead + 1);
1100         rc = le32_to_cpu(replyhead->result);
1101         bytes = le64_to_cpu(op->extent.length);
1102         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1103
1104         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1105                 (unsigned long long) bytes, read_op, (int) rc);
1106
1107         if (rc == -ENOENT && read_op) {
1108                 zero_bio_chain(req_data->bio, 0);
1109                 rc = 0;
1110         } else if (rc == 0 && read_op && bytes < req_data->len) {
1111                 zero_bio_chain(req_data->bio, bytes);
1112                 bytes = req_data->len;
1113         }
1114
1115         rbd_coll_end_req(req_data, rc, bytes);
1116
1117         if (req_data->bio)
1118                 bio_chain_put(req_data->bio);
1119
1120         ceph_osdc_put_request(req);
1121         kfree(req_data);
1122 }
1123
1124 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1125 {
1126         ceph_osdc_put_request(req);
1127 }
1128
1129 /*
1130  * Do a synchronous ceph osd operation
1131  */
1132 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1133                            struct ceph_snap_context *snapc,
1134                            u64 snapid,
1135                            int flags,
1136                            struct ceph_osd_req_op *ops,
1137                            const char *object_name,
1138                            u64 ofs, u64 inbound_size,
1139                            char *inbound,
1140                            struct ceph_osd_request **linger_req,
1141                            u64 *ver)
1142 {
1143         int ret;
1144         struct page **pages;
1145         int num_pages;
1146
1147         rbd_assert(ops != NULL);
1148
1149         num_pages = calc_pages_for(ofs, inbound_size);
1150         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1151         if (IS_ERR(pages))
1152                 return PTR_ERR(pages);
1153
1154         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1155                           object_name, ofs, inbound_size, NULL,
1156                           pages, num_pages,
1157                           flags,
1158                           ops,
1159                           NULL, 0,
1160                           NULL,
1161                           linger_req, ver);
1162         if (ret < 0)
1163                 goto done;
1164
1165         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1166                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1167
1168 done:
1169         ceph_release_page_vector(pages, num_pages);
1170         return ret;
1171 }
1172
1173 /*
1174  * Do an asynchronous ceph osd operation
1175  */
1176 static int rbd_do_op(struct request *rq,
1177                      struct rbd_device *rbd_dev,
1178                      struct ceph_snap_context *snapc,
1179                      u64 ofs, u64 len,
1180                      struct bio *bio,
1181                      struct rbd_req_coll *coll,
1182                      int coll_index)
1183 {
1184         char *seg_name;
1185         u64 seg_ofs;
1186         u64 seg_len;
1187         int ret;
1188         struct ceph_osd_req_op *ops;
1189         u32 payload_len;
1190         int opcode;
1191         int flags;
1192         u64 snapid;
1193
1194         seg_name = rbd_segment_name(rbd_dev, ofs);
1195         if (!seg_name)
1196                 return -ENOMEM;
1197         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1198         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1199
1200         if (rq_data_dir(rq) == WRITE) {
1201                 opcode = CEPH_OSD_OP_WRITE;
1202                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1203                 snapid = CEPH_NOSNAP;
1204                 payload_len = seg_len;
1205         } else {
1206                 opcode = CEPH_OSD_OP_READ;
1207                 flags = CEPH_OSD_FLAG_READ;
1208                 snapc = NULL;
1209                 snapid = rbd_dev->mapping.snap_id;
1210                 payload_len = 0;
1211         }
1212
1213         ret = -ENOMEM;
1214         ops = rbd_create_rw_ops(1, opcode, payload_len);
1215         if (!ops)
1216                 goto done;
1217
1218         /* we've taken care of segment sizes earlier when we
1219            cloned the bios. We should never have a segment
1220            truncated at this point */
1221         rbd_assert(seg_len == len);
1222
1223         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1224                              seg_name, seg_ofs, seg_len,
1225                              bio,
1226                              NULL, 0,
1227                              flags,
1228                              ops,
1229                              coll, coll_index,
1230                              rbd_req_cb, 0, NULL);
1231
1232         rbd_destroy_ops(ops);
1233 done:
1234         kfree(seg_name);
1235         return ret;
1236 }
1237
1238 /*
1239  * Request sync osd read
1240  */
1241 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1242                           u64 snapid,
1243                           const char *object_name,
1244                           u64 ofs, u64 len,
1245                           char *buf,
1246                           u64 *ver)
1247 {
1248         struct ceph_osd_req_op *ops;
1249         int ret;
1250
1251         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1252         if (!ops)
1253                 return -ENOMEM;
1254
1255         ret = rbd_req_sync_op(rbd_dev, NULL,
1256                                snapid,
1257                                CEPH_OSD_FLAG_READ,
1258                                ops, object_name, ofs, len, buf, NULL, ver);
1259         rbd_destroy_ops(ops);
1260
1261         return ret;
1262 }
1263
1264 /*
1265  * Request sync osd watch
1266  */
1267 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1268                                    u64 ver,
1269                                    u64 notify_id)
1270 {
1271         struct ceph_osd_req_op *ops;
1272         int ret;
1273
1274         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1275         if (!ops)
1276                 return -ENOMEM;
1277
1278         ops[0].watch.ver = cpu_to_le64(ver);
1279         ops[0].watch.cookie = notify_id;
1280         ops[0].watch.flag = 0;
1281
1282         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1283                           rbd_dev->header_name, 0, 0, NULL,
1284                           NULL, 0,
1285                           CEPH_OSD_FLAG_READ,
1286                           ops,
1287                           NULL, 0,
1288                           rbd_simple_req_cb, 0, NULL);
1289
1290         rbd_destroy_ops(ops);
1291         return ret;
1292 }
1293
1294 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1295 {
1296         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1297         u64 hver;
1298         int rc;
1299
1300         if (!rbd_dev)
1301                 return;
1302
1303         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1304                 rbd_dev->header_name, (unsigned long long) notify_id,
1305                 (unsigned int) opcode);
1306         rc = rbd_dev_refresh(rbd_dev, &hver);
1307         if (rc)
1308                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1309                            " update snaps: %d\n", rbd_dev->major, rc);
1310
1311         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1312 }
1313
1314 /*
1315  * Request sync osd watch
1316  */
1317 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1318 {
1319         struct ceph_osd_req_op *ops;
1320         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1321         int ret;
1322
1323         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1324         if (!ops)
1325                 return -ENOMEM;
1326
1327         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1328                                      (void *)rbd_dev, &rbd_dev->watch_event);
1329         if (ret < 0)
1330                 goto fail;
1331
1332         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1333         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1334         ops[0].watch.flag = 1;
1335
1336         ret = rbd_req_sync_op(rbd_dev, NULL,
1337                               CEPH_NOSNAP,
1338                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1339                               ops,
1340                               rbd_dev->header_name,
1341                               0, 0, NULL,
1342                               &rbd_dev->watch_request, NULL);
1343
1344         if (ret < 0)
1345                 goto fail_event;
1346
1347         rbd_destroy_ops(ops);
1348         return 0;
1349
1350 fail_event:
1351         ceph_osdc_cancel_event(rbd_dev->watch_event);
1352         rbd_dev->watch_event = NULL;
1353 fail:
1354         rbd_destroy_ops(ops);
1355         return ret;
1356 }
1357
1358 /*
1359  * Request sync osd unwatch
1360  */
1361 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1362 {
1363         struct ceph_osd_req_op *ops;
1364         int ret;
1365
1366         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1367         if (!ops)
1368                 return -ENOMEM;
1369
1370         ops[0].watch.ver = 0;
1371         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1372         ops[0].watch.flag = 0;
1373
1374         ret = rbd_req_sync_op(rbd_dev, NULL,
1375                               CEPH_NOSNAP,
1376                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1377                               ops,
1378                               rbd_dev->header_name,
1379                               0, 0, NULL, NULL, NULL);
1380
1381
1382         rbd_destroy_ops(ops);
1383         ceph_osdc_cancel_event(rbd_dev->watch_event);
1384         rbd_dev->watch_event = NULL;
1385         return ret;
1386 }
1387
1388 /*
1389  * Synchronous osd object method call
1390  */
1391 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1392                              const char *object_name,
1393                              const char *class_name,
1394                              const char *method_name,
1395                              const char *outbound,
1396                              size_t outbound_size,
1397                              char *inbound,
1398                              size_t inbound_size,
1399                              int flags,
1400                              u64 *ver)
1401 {
1402         struct ceph_osd_req_op *ops;
1403         int class_name_len = strlen(class_name);
1404         int method_name_len = strlen(method_name);
1405         int payload_size;
1406         int ret;
1407
1408         /*
1409          * Any input parameters required by the method we're calling
1410          * will be sent along with the class and method names as
1411          * part of the message payload.  That data and its size are
1412          * supplied via the indata and indata_len fields (named from
1413          * the perspective of the server side) in the OSD request
1414          * operation.
1415          */
1416         payload_size = class_name_len + method_name_len + outbound_size;
1417         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1418         if (!ops)
1419                 return -ENOMEM;
1420
1421         ops[0].cls.class_name = class_name;
1422         ops[0].cls.class_len = (__u8) class_name_len;
1423         ops[0].cls.method_name = method_name;
1424         ops[0].cls.method_len = (__u8) method_name_len;
1425         ops[0].cls.argc = 0;
1426         ops[0].cls.indata = outbound;
1427         ops[0].cls.indata_len = outbound_size;
1428
1429         ret = rbd_req_sync_op(rbd_dev, NULL,
1430                                CEPH_NOSNAP,
1431                                flags, ops,
1432                                object_name, 0, inbound_size, inbound,
1433                                NULL, ver);
1434
1435         rbd_destroy_ops(ops);
1436
1437         dout("cls_exec returned %d\n", ret);
1438         return ret;
1439 }
1440
1441 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1442 {
1443         struct rbd_req_coll *coll =
1444                         kzalloc(sizeof(struct rbd_req_coll) +
1445                                 sizeof(struct rbd_req_status) * num_reqs,
1446                                 GFP_ATOMIC);
1447
1448         if (!coll)
1449                 return NULL;
1450         coll->total = num_reqs;
1451         kref_init(&coll->kref);
1452         return coll;
1453 }
1454
1455 /*
1456  * block device queue callback
1457  */
1458 static void rbd_rq_fn(struct request_queue *q)
1459 {
1460         struct rbd_device *rbd_dev = q->queuedata;
1461         struct request *rq;
1462         struct bio_pair *bp = NULL;
1463
1464         while ((rq = blk_fetch_request(q))) {
1465                 struct bio *bio;
1466                 struct bio *rq_bio, *next_bio = NULL;
1467                 bool do_write;
1468                 unsigned int size;
1469                 u64 op_size = 0;
1470                 u64 ofs;
1471                 int num_segs, cur_seg = 0;
1472                 struct rbd_req_coll *coll;
1473                 struct ceph_snap_context *snapc;
1474
1475                 dout("fetched request\n");
1476
1477                 /* filter out block requests we don't understand */
1478                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1479                         __blk_end_request_all(rq, 0);
1480                         continue;
1481                 }
1482
1483                 /* deduce our operation (read, write) */
1484                 do_write = (rq_data_dir(rq) == WRITE);
1485
1486                 size = blk_rq_bytes(rq);
1487                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1488                 rq_bio = rq->bio;
1489                 if (do_write && rbd_dev->mapping.read_only) {
1490                         __blk_end_request_all(rq, -EROFS);
1491                         continue;
1492                 }
1493
1494                 spin_unlock_irq(q->queue_lock);
1495
1496                 down_read(&rbd_dev->header_rwsem);
1497
1498                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1499                                 !rbd_dev->mapping.snap_exists) {
1500                         up_read(&rbd_dev->header_rwsem);
1501                         dout("request for non-existent snapshot");
1502                         spin_lock_irq(q->queue_lock);
1503                         __blk_end_request_all(rq, -ENXIO);
1504                         continue;
1505                 }
1506
1507                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1508
1509                 up_read(&rbd_dev->header_rwsem);
1510
1511                 dout("%s 0x%x bytes at 0x%llx\n",
1512                      do_write ? "write" : "read",
1513                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1514
1515                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1516                 if (num_segs <= 0) {
1517                         spin_lock_irq(q->queue_lock);
1518                         __blk_end_request_all(rq, num_segs);
1519                         ceph_put_snap_context(snapc);
1520                         continue;
1521                 }
1522                 coll = rbd_alloc_coll(num_segs);
1523                 if (!coll) {
1524                         spin_lock_irq(q->queue_lock);
1525                         __blk_end_request_all(rq, -ENOMEM);
1526                         ceph_put_snap_context(snapc);
1527                         continue;
1528                 }
1529
1530                 do {
1531                         /* a bio clone to be passed down to OSD req */
1532                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1533                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1534                         kref_get(&coll->kref);
1535                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1536                                               op_size, GFP_ATOMIC);
1537                         if (bio)
1538                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1539                                                 ofs, op_size,
1540                                                 bio, coll, cur_seg);
1541                         else
1542                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1543                                                        -ENOMEM, op_size);
1544                         size -= op_size;
1545                         ofs += op_size;
1546
1547                         cur_seg++;
1548                         rq_bio = next_bio;
1549                 } while (size > 0);
1550                 kref_put(&coll->kref, rbd_coll_release);
1551
1552                 if (bp)
1553                         bio_pair_release(bp);
1554                 spin_lock_irq(q->queue_lock);
1555
1556                 ceph_put_snap_context(snapc);
1557         }
1558 }
1559
1560 /*
1561  * a queue callback. Makes sure that we don't create a bio that spans across
1562  * multiple osd objects. One exception would be with a single page bios,
1563  * which we handle later at bio_chain_clone
1564  */
1565 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1566                           struct bio_vec *bvec)
1567 {
1568         struct rbd_device *rbd_dev = q->queuedata;
1569         unsigned int chunk_sectors;
1570         sector_t sector;
1571         unsigned int bio_sectors;
1572         int max;
1573
1574         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1575         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1576         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1577
1578         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1579                                  + bio_sectors)) << SECTOR_SHIFT;
1580         if (max < 0)
1581                 max = 0; /* bio_add cannot handle a negative return */
1582         if (max <= bvec->bv_len && bio_sectors == 0)
1583                 return bvec->bv_len;
1584         return max;
1585 }
1586
1587 static void rbd_free_disk(struct rbd_device *rbd_dev)
1588 {
1589         struct gendisk *disk = rbd_dev->disk;
1590
1591         if (!disk)
1592                 return;
1593
1594         if (disk->flags & GENHD_FL_UP)
1595                 del_gendisk(disk);
1596         if (disk->queue)
1597                 blk_cleanup_queue(disk->queue);
1598         put_disk(disk);
1599 }
1600
1601 /*
1602  * Read the complete header for the given rbd device.
1603  *
1604  * Returns a pointer to a dynamically-allocated buffer containing
1605  * the complete and validated header.  Caller can pass the address
1606  * of a variable that will be filled in with the version of the
1607  * header object at the time it was read.
1608  *
1609  * Returns a pointer-coded errno if a failure occurs.
1610  */
1611 static struct rbd_image_header_ondisk *
1612 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1613 {
1614         struct rbd_image_header_ondisk *ondisk = NULL;
1615         u32 snap_count = 0;
1616         u64 names_size = 0;
1617         u32 want_count;
1618         int ret;
1619
1620         /*
1621          * The complete header will include an array of its 64-bit
1622          * snapshot ids, followed by the names of those snapshots as
1623          * a contiguous block of NUL-terminated strings.  Note that
1624          * the number of snapshots could change by the time we read
1625          * it in, in which case we re-read it.
1626          */
1627         do {
1628                 size_t size;
1629
1630                 kfree(ondisk);
1631
1632                 size = sizeof (*ondisk);
1633                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1634                 size += names_size;
1635                 ondisk = kmalloc(size, GFP_KERNEL);
1636                 if (!ondisk)
1637                         return ERR_PTR(-ENOMEM);
1638
1639                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1640                                        rbd_dev->header_name,
1641                                        0, size,
1642                                        (char *) ondisk, version);
1643
1644                 if (ret < 0)
1645                         goto out_err;
1646                 if (WARN_ON((size_t) ret < size)) {
1647                         ret = -ENXIO;
1648                         pr_warning("short header read for image %s"
1649                                         " (want %zd got %d)\n",
1650                                 rbd_dev->image_name, size, ret);
1651                         goto out_err;
1652                 }
1653                 if (!rbd_dev_ondisk_valid(ondisk)) {
1654                         ret = -ENXIO;
1655                         pr_warning("invalid header for image %s\n",
1656                                 rbd_dev->image_name);
1657                         goto out_err;
1658                 }
1659
1660                 names_size = le64_to_cpu(ondisk->snap_names_len);
1661                 want_count = snap_count;
1662                 snap_count = le32_to_cpu(ondisk->snap_count);
1663         } while (snap_count != want_count);
1664
1665         return ondisk;
1666
1667 out_err:
1668         kfree(ondisk);
1669
1670         return ERR_PTR(ret);
1671 }
1672
1673 /*
1674  * reload the ondisk the header
1675  */
1676 static int rbd_read_header(struct rbd_device *rbd_dev,
1677                            struct rbd_image_header *header)
1678 {
1679         struct rbd_image_header_ondisk *ondisk;
1680         u64 ver = 0;
1681         int ret;
1682
1683         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1684         if (IS_ERR(ondisk))
1685                 return PTR_ERR(ondisk);
1686         ret = rbd_header_from_disk(header, ondisk);
1687         if (ret >= 0)
1688                 header->obj_version = ver;
1689         kfree(ondisk);
1690
1691         return ret;
1692 }
1693
1694 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1695 {
1696         struct rbd_snap *snap;
1697         struct rbd_snap *next;
1698
1699         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1700                 __rbd_remove_snap_dev(snap);
1701 }
1702
1703 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1704 {
1705         sector_t size;
1706
1707         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1708                 return;
1709
1710         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1711         dout("setting size to %llu sectors", (unsigned long long) size);
1712         rbd_dev->mapping.size = (u64) size;
1713         set_capacity(rbd_dev->disk, size);
1714 }
1715
1716 /*
1717  * only read the first part of the ondisk header, without the snaps info
1718  */
1719 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1720 {
1721         int ret;
1722         struct rbd_image_header h;
1723
1724         ret = rbd_read_header(rbd_dev, &h);
1725         if (ret < 0)
1726                 return ret;
1727
1728         down_write(&rbd_dev->header_rwsem);
1729
1730         /* Update image size, and check for resize of mapped image */
1731         rbd_dev->header.image_size = h.image_size;
1732         rbd_update_mapping_size(rbd_dev);
1733
1734         /* rbd_dev->header.object_prefix shouldn't change */
1735         kfree(rbd_dev->header.snap_sizes);
1736         kfree(rbd_dev->header.snap_names);
1737         /* osd requests may still refer to snapc */
1738         ceph_put_snap_context(rbd_dev->header.snapc);
1739
1740         if (hver)
1741                 *hver = h.obj_version;
1742         rbd_dev->header.obj_version = h.obj_version;
1743         rbd_dev->header.image_size = h.image_size;
1744         rbd_dev->header.snapc = h.snapc;
1745         rbd_dev->header.snap_names = h.snap_names;
1746         rbd_dev->header.snap_sizes = h.snap_sizes;
1747         /* Free the extra copy of the object prefix */
1748         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1749         kfree(h.object_prefix);
1750
1751         ret = rbd_dev_snaps_update(rbd_dev);
1752         if (!ret)
1753                 ret = rbd_dev_snaps_register(rbd_dev);
1754
1755         up_write(&rbd_dev->header_rwsem);
1756
1757         return ret;
1758 }
1759
1760 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1761 {
1762         int ret;
1763
1764         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1765         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1766         if (rbd_dev->image_format == 1)
1767                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1768         else
1769                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1770         mutex_unlock(&ctl_mutex);
1771
1772         return ret;
1773 }
1774
1775 static int rbd_init_disk(struct rbd_device *rbd_dev)
1776 {
1777         struct gendisk *disk;
1778         struct request_queue *q;
1779         u64 segment_size;
1780
1781         /* create gendisk info */
1782         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1783         if (!disk)
1784                 return -ENOMEM;
1785
1786         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1787                  rbd_dev->dev_id);
1788         disk->major = rbd_dev->major;
1789         disk->first_minor = 0;
1790         disk->fops = &rbd_bd_ops;
1791         disk->private_data = rbd_dev;
1792
1793         /* init rq */
1794         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1795         if (!q)
1796                 goto out_disk;
1797
1798         /* We use the default size, but let's be explicit about it. */
1799         blk_queue_physical_block_size(q, SECTOR_SIZE);
1800
1801         /* set io sizes to object size */
1802         segment_size = rbd_obj_bytes(&rbd_dev->header);
1803         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1804         blk_queue_max_segment_size(q, segment_size);
1805         blk_queue_io_min(q, segment_size);
1806         blk_queue_io_opt(q, segment_size);
1807
1808         blk_queue_merge_bvec(q, rbd_merge_bvec);
1809         disk->queue = q;
1810
1811         q->queuedata = rbd_dev;
1812
1813         rbd_dev->disk = disk;
1814
1815         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1816
1817         return 0;
1818 out_disk:
1819         put_disk(disk);
1820
1821         return -ENOMEM;
1822 }
1823
1824 /*
1825   sysfs
1826 */
1827
1828 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1829 {
1830         return container_of(dev, struct rbd_device, dev);
1831 }
1832
1833 static ssize_t rbd_size_show(struct device *dev,
1834                              struct device_attribute *attr, char *buf)
1835 {
1836         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1837         sector_t size;
1838
1839         down_read(&rbd_dev->header_rwsem);
1840         size = get_capacity(rbd_dev->disk);
1841         up_read(&rbd_dev->header_rwsem);
1842
1843         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1844 }
1845
1846 /*
1847  * Note this shows the features for whatever's mapped, which is not
1848  * necessarily the base image.
1849  */
1850 static ssize_t rbd_features_show(struct device *dev,
1851                              struct device_attribute *attr, char *buf)
1852 {
1853         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1854
1855         return sprintf(buf, "0x%016llx\n",
1856                         (unsigned long long) rbd_dev->mapping.features);
1857 }
1858
1859 static ssize_t rbd_major_show(struct device *dev,
1860                               struct device_attribute *attr, char *buf)
1861 {
1862         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1863
1864         return sprintf(buf, "%d\n", rbd_dev->major);
1865 }
1866
1867 static ssize_t rbd_client_id_show(struct device *dev,
1868                                   struct device_attribute *attr, char *buf)
1869 {
1870         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1871
1872         return sprintf(buf, "client%lld\n",
1873                         ceph_client_id(rbd_dev->rbd_client->client));
1874 }
1875
1876 static ssize_t rbd_pool_show(struct device *dev,
1877                              struct device_attribute *attr, char *buf)
1878 {
1879         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1880
1881         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1882 }
1883
1884 static ssize_t rbd_pool_id_show(struct device *dev,
1885                              struct device_attribute *attr, char *buf)
1886 {
1887         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888
1889         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1890 }
1891
1892 static ssize_t rbd_name_show(struct device *dev,
1893                              struct device_attribute *attr, char *buf)
1894 {
1895         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1896
1897         return sprintf(buf, "%s\n", rbd_dev->image_name);
1898 }
1899
1900 static ssize_t rbd_image_id_show(struct device *dev,
1901                              struct device_attribute *attr, char *buf)
1902 {
1903         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1904
1905         return sprintf(buf, "%s\n", rbd_dev->image_id);
1906 }
1907
1908 /*
1909  * Shows the name of the currently-mapped snapshot (or
1910  * RBD_SNAP_HEAD_NAME for the base image).
1911  */
1912 static ssize_t rbd_snap_show(struct device *dev,
1913                              struct device_attribute *attr,
1914                              char *buf)
1915 {
1916         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1917
1918         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1919 }
1920
1921 static ssize_t rbd_image_refresh(struct device *dev,
1922                                  struct device_attribute *attr,
1923                                  const char *buf,
1924                                  size_t size)
1925 {
1926         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1927         int ret;
1928
1929         ret = rbd_dev_refresh(rbd_dev, NULL);
1930
1931         return ret < 0 ? ret : size;
1932 }
1933
1934 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1935 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1936 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1937 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1938 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1939 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1940 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1941 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1942 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1943 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1944
1945 static struct attribute *rbd_attrs[] = {
1946         &dev_attr_size.attr,
1947         &dev_attr_features.attr,
1948         &dev_attr_major.attr,
1949         &dev_attr_client_id.attr,
1950         &dev_attr_pool.attr,
1951         &dev_attr_pool_id.attr,
1952         &dev_attr_name.attr,
1953         &dev_attr_image_id.attr,
1954         &dev_attr_current_snap.attr,
1955         &dev_attr_refresh.attr,
1956         NULL
1957 };
1958
1959 static struct attribute_group rbd_attr_group = {
1960         .attrs = rbd_attrs,
1961 };
1962
1963 static const struct attribute_group *rbd_attr_groups[] = {
1964         &rbd_attr_group,
1965         NULL
1966 };
1967
1968 static void rbd_sysfs_dev_release(struct device *dev)
1969 {
1970 }
1971
1972 static struct device_type rbd_device_type = {
1973         .name           = "rbd",
1974         .groups         = rbd_attr_groups,
1975         .release        = rbd_sysfs_dev_release,
1976 };
1977
1978
1979 /*
1980   sysfs - snapshots
1981 */
1982
1983 static ssize_t rbd_snap_size_show(struct device *dev,
1984                                   struct device_attribute *attr,
1985                                   char *buf)
1986 {
1987         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1988
1989         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1990 }
1991
1992 static ssize_t rbd_snap_id_show(struct device *dev,
1993                                 struct device_attribute *attr,
1994                                 char *buf)
1995 {
1996         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1997
1998         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1999 }
2000
2001 static ssize_t rbd_snap_features_show(struct device *dev,
2002                                 struct device_attribute *attr,
2003                                 char *buf)
2004 {
2005         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2006
2007         return sprintf(buf, "0x%016llx\n",
2008                         (unsigned long long) snap->features);
2009 }
2010
2011 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2012 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2013 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2014
2015 static struct attribute *rbd_snap_attrs[] = {
2016         &dev_attr_snap_size.attr,
2017         &dev_attr_snap_id.attr,
2018         &dev_attr_snap_features.attr,
2019         NULL,
2020 };
2021
2022 static struct attribute_group rbd_snap_attr_group = {
2023         .attrs = rbd_snap_attrs,
2024 };
2025
2026 static void rbd_snap_dev_release(struct device *dev)
2027 {
2028         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2029         kfree(snap->name);
2030         kfree(snap);
2031 }
2032
2033 static const struct attribute_group *rbd_snap_attr_groups[] = {
2034         &rbd_snap_attr_group,
2035         NULL
2036 };
2037
2038 static struct device_type rbd_snap_device_type = {
2039         .groups         = rbd_snap_attr_groups,
2040         .release        = rbd_snap_dev_release,
2041 };
2042
2043 static bool rbd_snap_registered(struct rbd_snap *snap)
2044 {
2045         bool ret = snap->dev.type == &rbd_snap_device_type;
2046         bool reg = device_is_registered(&snap->dev);
2047
2048         rbd_assert(!ret ^ reg);
2049
2050         return ret;
2051 }
2052
2053 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2054 {
2055         list_del(&snap->node);
2056         if (device_is_registered(&snap->dev))
2057                 device_unregister(&snap->dev);
2058 }
2059
2060 static int rbd_register_snap_dev(struct rbd_snap *snap,
2061                                   struct device *parent)
2062 {
2063         struct device *dev = &snap->dev;
2064         int ret;
2065
2066         dev->type = &rbd_snap_device_type;
2067         dev->parent = parent;
2068         dev->release = rbd_snap_dev_release;
2069         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2070         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2071
2072         ret = device_register(dev);
2073
2074         return ret;
2075 }
2076
2077 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2078                                                 const char *snap_name,
2079                                                 u64 snap_id, u64 snap_size,
2080                                                 u64 snap_features)
2081 {
2082         struct rbd_snap *snap;
2083         int ret;
2084
2085         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2086         if (!snap)
2087                 return ERR_PTR(-ENOMEM);
2088
2089         ret = -ENOMEM;
2090         snap->name = kstrdup(snap_name, GFP_KERNEL);
2091         if (!snap->name)
2092                 goto err;
2093
2094         snap->id = snap_id;
2095         snap->size = snap_size;
2096         snap->features = snap_features;
2097
2098         return snap;
2099
2100 err:
2101         kfree(snap->name);
2102         kfree(snap);
2103
2104         return ERR_PTR(ret);
2105 }
2106
2107 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2108                 u64 *snap_size, u64 *snap_features)
2109 {
2110         char *snap_name;
2111
2112         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2113
2114         *snap_size = rbd_dev->header.snap_sizes[which];
2115         *snap_features = 0;     /* No features for v1 */
2116
2117         /* Skip over names until we find the one we are looking for */
2118
2119         snap_name = rbd_dev->header.snap_names;
2120         while (which--)
2121                 snap_name += strlen(snap_name) + 1;
2122
2123         return snap_name;
2124 }
2125
2126 /*
2127  * Get the size and object order for an image snapshot, or if
2128  * snap_id is CEPH_NOSNAP, gets this information for the base
2129  * image.
2130  */
2131 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2132                                 u8 *order, u64 *snap_size)
2133 {
2134         __le64 snapid = cpu_to_le64(snap_id);
2135         int ret;
2136         struct {
2137                 u8 order;
2138                 __le64 size;
2139         } __attribute__ ((packed)) size_buf = { 0 };
2140
2141         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2142                                 "rbd", "get_size",
2143                                 (char *) &snapid, sizeof (snapid),
2144                                 (char *) &size_buf, sizeof (size_buf),
2145                                 CEPH_OSD_FLAG_READ, NULL);
2146         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2147         if (ret < 0)
2148                 return ret;
2149
2150         *order = size_buf.order;
2151         *snap_size = le64_to_cpu(size_buf.size);
2152
2153         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2154                 (unsigned long long) snap_id, (unsigned int) *order,
2155                 (unsigned long long) *snap_size);
2156
2157         return 0;
2158 }
2159
2160 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2161 {
2162         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2163                                         &rbd_dev->header.obj_order,
2164                                         &rbd_dev->header.image_size);
2165 }
2166
2167 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2168 {
2169         void *reply_buf;
2170         int ret;
2171         void *p;
2172
2173         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2174         if (!reply_buf)
2175                 return -ENOMEM;
2176
2177         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2178                                 "rbd", "get_object_prefix",
2179                                 NULL, 0,
2180                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2181                                 CEPH_OSD_FLAG_READ, NULL);
2182         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2183         if (ret < 0)
2184                 goto out;
2185         ret = 0;    /* rbd_req_sync_exec() can return positive */
2186
2187         p = reply_buf;
2188         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2189                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2190                                                 NULL, GFP_NOIO);
2191
2192         if (IS_ERR(rbd_dev->header.object_prefix)) {
2193                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2194                 rbd_dev->header.object_prefix = NULL;
2195         } else {
2196                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2197         }
2198
2199 out:
2200         kfree(reply_buf);
2201
2202         return ret;
2203 }
2204
2205 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2206                 u64 *snap_features)
2207 {
2208         __le64 snapid = cpu_to_le64(snap_id);
2209         struct {
2210                 __le64 features;
2211                 __le64 incompat;
2212         } features_buf = { 0 };
2213         u64 incompat;
2214         int ret;
2215
2216         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2217                                 "rbd", "get_features",
2218                                 (char *) &snapid, sizeof (snapid),
2219                                 (char *) &features_buf, sizeof (features_buf),
2220                                 CEPH_OSD_FLAG_READ, NULL);
2221         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2222         if (ret < 0)
2223                 return ret;
2224
2225         incompat = le64_to_cpu(features_buf.incompat);
2226         if (incompat & ~RBD_FEATURES_ALL)
2227                 return -ENOTSUPP;
2228
2229         *snap_features = le64_to_cpu(features_buf.features);
2230
2231         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2232                 (unsigned long long) snap_id,
2233                 (unsigned long long) *snap_features,
2234                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2235
2236         return 0;
2237 }
2238
2239 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2240 {
2241         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2242                                                 &rbd_dev->header.features);
2243 }
2244
2245 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2246 {
2247         size_t size;
2248         int ret;
2249         void *reply_buf;
2250         void *p;
2251         void *end;
2252         u64 seq;
2253         u32 snap_count;
2254         struct ceph_snap_context *snapc;
2255         u32 i;
2256
2257         /*
2258          * We'll need room for the seq value (maximum snapshot id),
2259          * snapshot count, and array of that many snapshot ids.
2260          * For now we have a fixed upper limit on the number we're
2261          * prepared to receive.
2262          */
2263         size = sizeof (__le64) + sizeof (__le32) +
2264                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2265         reply_buf = kzalloc(size, GFP_KERNEL);
2266         if (!reply_buf)
2267                 return -ENOMEM;
2268
2269         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2270                                 "rbd", "get_snapcontext",
2271                                 NULL, 0,
2272                                 reply_buf, size,
2273                                 CEPH_OSD_FLAG_READ, ver);
2274         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2275         if (ret < 0)
2276                 goto out;
2277
2278         ret = -ERANGE;
2279         p = reply_buf;
2280         end = (char *) reply_buf + size;
2281         ceph_decode_64_safe(&p, end, seq, out);
2282         ceph_decode_32_safe(&p, end, snap_count, out);
2283
2284         /*
2285          * Make sure the reported number of snapshot ids wouldn't go
2286          * beyond the end of our buffer.  But before checking that,
2287          * make sure the computed size of the snapshot context we
2288          * allocate is representable in a size_t.
2289          */
2290         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2291                                  / sizeof (u64)) {
2292                 ret = -EINVAL;
2293                 goto out;
2294         }
2295         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2296                 goto out;
2297
2298         size = sizeof (struct ceph_snap_context) +
2299                                 snap_count * sizeof (snapc->snaps[0]);
2300         snapc = kmalloc(size, GFP_KERNEL);
2301         if (!snapc) {
2302                 ret = -ENOMEM;
2303                 goto out;
2304         }
2305
2306         atomic_set(&snapc->nref, 1);
2307         snapc->seq = seq;
2308         snapc->num_snaps = snap_count;
2309         for (i = 0; i < snap_count; i++)
2310                 snapc->snaps[i] = ceph_decode_64(&p);
2311
2312         rbd_dev->header.snapc = snapc;
2313
2314         dout("  snap context seq = %llu, snap_count = %u\n",
2315                 (unsigned long long) seq, (unsigned int) snap_count);
2316
2317 out:
2318         kfree(reply_buf);
2319
2320         return 0;
2321 }
2322
2323 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2324 {
2325         size_t size;
2326         void *reply_buf;
2327         __le64 snap_id;
2328         int ret;
2329         void *p;
2330         void *end;
2331         size_t snap_name_len;
2332         char *snap_name;
2333
2334         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2335         reply_buf = kmalloc(size, GFP_KERNEL);
2336         if (!reply_buf)
2337                 return ERR_PTR(-ENOMEM);
2338
2339         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2340         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2341                                 "rbd", "get_snapshot_name",
2342                                 (char *) &snap_id, sizeof (snap_id),
2343                                 reply_buf, size,
2344                                 CEPH_OSD_FLAG_READ, NULL);
2345         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2346         if (ret < 0)
2347                 goto out;
2348
2349         p = reply_buf;
2350         end = (char *) reply_buf + size;
2351         snap_name_len = 0;
2352         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2353                                 GFP_KERNEL);
2354         if (IS_ERR(snap_name)) {
2355                 ret = PTR_ERR(snap_name);
2356                 goto out;
2357         } else {
2358                 dout("  snap_id 0x%016llx snap_name = %s\n",
2359                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2360         }
2361         kfree(reply_buf);
2362
2363         return snap_name;
2364 out:
2365         kfree(reply_buf);
2366
2367         return ERR_PTR(ret);
2368 }
2369
2370 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2371                 u64 *snap_size, u64 *snap_features)
2372 {
2373         __le64 snap_id;
2374         u8 order;
2375         int ret;
2376
2377         snap_id = rbd_dev->header.snapc->snaps[which];
2378         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2379         if (ret)
2380                 return ERR_PTR(ret);
2381         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2382         if (ret)
2383                 return ERR_PTR(ret);
2384
2385         return rbd_dev_v2_snap_name(rbd_dev, which);
2386 }
2387
2388 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2389                 u64 *snap_size, u64 *snap_features)
2390 {
2391         if (rbd_dev->image_format == 1)
2392                 return rbd_dev_v1_snap_info(rbd_dev, which,
2393                                         snap_size, snap_features);
2394         if (rbd_dev->image_format == 2)
2395                 return rbd_dev_v2_snap_info(rbd_dev, which,
2396                                         snap_size, snap_features);
2397         return ERR_PTR(-EINVAL);
2398 }
2399
2400 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2401 {
2402         int ret;
2403         __u8 obj_order;
2404
2405         down_write(&rbd_dev->header_rwsem);
2406
2407         /* Grab old order first, to see if it changes */
2408
2409         obj_order = rbd_dev->header.obj_order,
2410         ret = rbd_dev_v2_image_size(rbd_dev);
2411         if (ret)
2412                 goto out;
2413         if (rbd_dev->header.obj_order != obj_order) {
2414                 ret = -EIO;
2415                 goto out;
2416         }
2417         rbd_update_mapping_size(rbd_dev);
2418
2419         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2420         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2421         if (ret)
2422                 goto out;
2423         ret = rbd_dev_snaps_update(rbd_dev);
2424         dout("rbd_dev_snaps_update returned %d\n", ret);
2425         if (ret)
2426                 goto out;
2427         ret = rbd_dev_snaps_register(rbd_dev);
2428         dout("rbd_dev_snaps_register returned %d\n", ret);
2429 out:
2430         up_write(&rbd_dev->header_rwsem);
2431
2432         return ret;
2433 }
2434
2435 /*
2436  * Scan the rbd device's current snapshot list and compare it to the
2437  * newly-received snapshot context.  Remove any existing snapshots
2438  * not present in the new snapshot context.  Add a new snapshot for
2439  * any snaphots in the snapshot context not in the current list.
2440  * And verify there are no changes to snapshots we already know
2441  * about.
2442  *
2443  * Assumes the snapshots in the snapshot context are sorted by
2444  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2445  * are also maintained in that order.)
2446  */
2447 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2448 {
2449         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2450         const u32 snap_count = snapc->num_snaps;
2451         struct list_head *head = &rbd_dev->snaps;
2452         struct list_head *links = head->next;
2453         u32 index = 0;
2454
2455         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2456         while (index < snap_count || links != head) {
2457                 u64 snap_id;
2458                 struct rbd_snap *snap;
2459                 char *snap_name;
2460                 u64 snap_size = 0;
2461                 u64 snap_features = 0;
2462
2463                 snap_id = index < snap_count ? snapc->snaps[index]
2464                                              : CEPH_NOSNAP;
2465                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2466                                      : NULL;
2467                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2468
2469                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2470                         struct list_head *next = links->next;
2471
2472                         /* Existing snapshot not in the new snap context */
2473
2474                         if (rbd_dev->mapping.snap_id == snap->id)
2475                                 rbd_dev->mapping.snap_exists = false;
2476                         __rbd_remove_snap_dev(snap);
2477                         dout("%ssnap id %llu has been removed\n",
2478                                 rbd_dev->mapping.snap_id == snap->id ?
2479                                                                 "mapped " : "",
2480                                 (unsigned long long) snap->id);
2481
2482                         /* Done with this list entry; advance */
2483
2484                         links = next;
2485                         continue;
2486                 }
2487
2488                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2489                                         &snap_size, &snap_features);
2490                 if (IS_ERR(snap_name))
2491                         return PTR_ERR(snap_name);
2492
2493                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2494                         (unsigned long long) snap_id);
2495                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2496                         struct rbd_snap *new_snap;
2497
2498                         /* We haven't seen this snapshot before */
2499
2500                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2501                                         snap_id, snap_size, snap_features);
2502                         if (IS_ERR(new_snap)) {
2503                                 int err = PTR_ERR(new_snap);
2504
2505                                 dout("  failed to add dev, error %d\n", err);
2506
2507                                 return err;
2508                         }
2509
2510                         /* New goes before existing, or at end of list */
2511
2512                         dout("  added dev%s\n", snap ? "" : " at end\n");
2513                         if (snap)
2514                                 list_add_tail(&new_snap->node, &snap->node);
2515                         else
2516                                 list_add_tail(&new_snap->node, head);
2517                 } else {
2518                         /* Already have this one */
2519
2520                         dout("  already present\n");
2521
2522                         rbd_assert(snap->size == snap_size);
2523                         rbd_assert(!strcmp(snap->name, snap_name));
2524                         rbd_assert(snap->features == snap_features);
2525
2526                         /* Done with this list entry; advance */
2527
2528                         links = links->next;
2529                 }
2530
2531                 /* Advance to the next entry in the snapshot context */
2532
2533                 index++;
2534         }
2535         dout("%s: done\n", __func__);
2536
2537         return 0;
2538 }
2539
2540 /*
2541  * Scan the list of snapshots and register the devices for any that
2542  * have not already been registered.
2543  */
2544 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2545 {
2546         struct rbd_snap *snap;
2547         int ret = 0;
2548
2549         dout("%s called\n", __func__);
2550         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2551                 return -EIO;
2552
2553         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2554                 if (!rbd_snap_registered(snap)) {
2555                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2556                         if (ret < 0)
2557                                 break;
2558                 }
2559         }
2560         dout("%s: returning %d\n", __func__, ret);
2561
2562         return ret;
2563 }
2564
2565 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2566 {
2567         struct device *dev;
2568         int ret;
2569
2570         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2571
2572         dev = &rbd_dev->dev;
2573         dev->bus = &rbd_bus_type;
2574         dev->type = &rbd_device_type;
2575         dev->parent = &rbd_root_dev;
2576         dev->release = rbd_dev_release;
2577         dev_set_name(dev, "%d", rbd_dev->dev_id);
2578         ret = device_register(dev);
2579
2580         mutex_unlock(&ctl_mutex);
2581
2582         return ret;
2583 }
2584
2585 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2586 {
2587         device_unregister(&rbd_dev->dev);
2588 }
2589
2590 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2591 {
2592         int ret, rc;
2593
2594         do {
2595                 ret = rbd_req_sync_watch(rbd_dev);
2596                 if (ret == -ERANGE) {
2597                         rc = rbd_dev_refresh(rbd_dev, NULL);
2598                         if (rc < 0)
2599                                 return rc;
2600                 }
2601         } while (ret == -ERANGE);
2602
2603         return ret;
2604 }
2605
2606 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2607
2608 /*
2609  * Get a unique rbd identifier for the given new rbd_dev, and add
2610  * the rbd_dev to the global list.  The minimum rbd id is 1.
2611  */
2612 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2613 {
2614         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2615
2616         spin_lock(&rbd_dev_list_lock);
2617         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2618         spin_unlock(&rbd_dev_list_lock);
2619         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2620                 (unsigned long long) rbd_dev->dev_id);
2621 }
2622
2623 /*
2624  * Remove an rbd_dev from the global list, and record that its
2625  * identifier is no longer in use.
2626  */
2627 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2628 {
2629         struct list_head *tmp;
2630         int rbd_id = rbd_dev->dev_id;
2631         int max_id;
2632
2633         rbd_assert(rbd_id > 0);
2634
2635         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2636                 (unsigned long long) rbd_dev->dev_id);
2637         spin_lock(&rbd_dev_list_lock);
2638         list_del_init(&rbd_dev->node);
2639
2640         /*
2641          * If the id being "put" is not the current maximum, there
2642          * is nothing special we need to do.
2643          */
2644         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2645                 spin_unlock(&rbd_dev_list_lock);
2646                 return;
2647         }
2648
2649         /*
2650          * We need to update the current maximum id.  Search the
2651          * list to find out what it is.  We're more likely to find
2652          * the maximum at the end, so search the list backward.
2653          */
2654         max_id = 0;
2655         list_for_each_prev(tmp, &rbd_dev_list) {
2656                 struct rbd_device *rbd_dev;
2657
2658                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2659                 if (rbd_dev->dev_id > max_id)
2660                         max_id = rbd_dev->dev_id;
2661         }
2662         spin_unlock(&rbd_dev_list_lock);
2663
2664         /*
2665          * The max id could have been updated by rbd_dev_id_get(), in
2666          * which case it now accurately reflects the new maximum.
2667          * Be careful not to overwrite the maximum value in that
2668          * case.
2669          */
2670         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2671         dout("  max dev id has been reset\n");
2672 }
2673
2674 /*
2675  * Skips over white space at *buf, and updates *buf to point to the
2676  * first found non-space character (if any). Returns the length of
2677  * the token (string of non-white space characters) found.  Note
2678  * that *buf must be terminated with '\0'.
2679  */
2680 static inline size_t next_token(const char **buf)
2681 {
2682         /*
2683         * These are the characters that produce nonzero for
2684         * isspace() in the "C" and "POSIX" locales.
2685         */
2686         const char *spaces = " \f\n\r\t\v";
2687
2688         *buf += strspn(*buf, spaces);   /* Find start of token */
2689
2690         return strcspn(*buf, spaces);   /* Return token length */
2691 }
2692
2693 /*
2694  * Finds the next token in *buf, and if the provided token buffer is
2695  * big enough, copies the found token into it.  The result, if
2696  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2697  * must be terminated with '\0' on entry.
2698  *
2699  * Returns the length of the token found (not including the '\0').
2700  * Return value will be 0 if no token is found, and it will be >=
2701  * token_size if the token would not fit.
2702  *
2703  * The *buf pointer will be updated to point beyond the end of the
2704  * found token.  Note that this occurs even if the token buffer is
2705  * too small to hold it.
2706  */
2707 static inline size_t copy_token(const char **buf,
2708                                 char *token,
2709                                 size_t token_size)
2710 {
2711         size_t len;
2712
2713         len = next_token(buf);
2714         if (len < token_size) {
2715                 memcpy(token, *buf, len);
2716                 *(token + len) = '\0';
2717         }
2718         *buf += len;
2719
2720         return len;
2721 }
2722
2723 /*
2724  * Finds the next token in *buf, dynamically allocates a buffer big
2725  * enough to hold a copy of it, and copies the token into the new
2726  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2727  * that a duplicate buffer is created even for a zero-length token.
2728  *
2729  * Returns a pointer to the newly-allocated duplicate, or a null
2730  * pointer if memory for the duplicate was not available.  If
2731  * the lenp argument is a non-null pointer, the length of the token
2732  * (not including the '\0') is returned in *lenp.
2733  *
2734  * If successful, the *buf pointer will be updated to point beyond
2735  * the end of the found token.
2736  *
2737  * Note: uses GFP_KERNEL for allocation.
2738  */
2739 static inline char *dup_token(const char **buf, size_t *lenp)
2740 {
2741         char *dup;
2742         size_t len;
2743
2744         len = next_token(buf);
2745         dup = kmalloc(len + 1, GFP_KERNEL);
2746         if (!dup)
2747                 return NULL;
2748
2749         memcpy(dup, *buf, len);
2750         *(dup + len) = '\0';
2751         *buf += len;
2752
2753         if (lenp)
2754                 *lenp = len;
2755
2756         return dup;
2757 }
2758
2759 /*
2760  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2761  * rbd_md_name, and name fields of the given rbd_dev, based on the
2762  * list of monitor addresses and other options provided via
2763  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2764  * copy of the snapshot name to map if successful, or a
2765  * pointer-coded error otherwise.
2766  *
2767  * Note: rbd_dev is assumed to have been initially zero-filled.
2768  */
2769 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2770                                 const char *buf,
2771                                 const char **mon_addrs,
2772                                 size_t *mon_addrs_size,
2773                                 char *options,
2774                                 size_t options_size)
2775 {
2776         size_t len;
2777         char *err_ptr = ERR_PTR(-EINVAL);
2778         char *snap_name;
2779
2780         /* The first four tokens are required */
2781
2782         len = next_token(&buf);
2783         if (!len)
2784                 return err_ptr;
2785         *mon_addrs_size = len + 1;
2786         *mon_addrs = buf;
2787
2788         buf += len;
2789
2790         len = copy_token(&buf, options, options_size);
2791         if (!len || len >= options_size)
2792                 return err_ptr;
2793
2794         err_ptr = ERR_PTR(-ENOMEM);
2795         rbd_dev->pool_name = dup_token(&buf, NULL);
2796         if (!rbd_dev->pool_name)
2797                 goto out_err;
2798
2799         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2800         if (!rbd_dev->image_name)
2801                 goto out_err;
2802
2803         /* Snapshot name is optional; default is to use "head" */
2804
2805         len = next_token(&buf);
2806         if (len > RBD_MAX_SNAP_NAME_LEN) {
2807                 err_ptr = ERR_PTR(-ENAMETOOLONG);
2808                 goto out_err;
2809         }
2810         if (!len) {
2811                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2812                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2813         }
2814         snap_name = kmalloc(len + 1, GFP_KERNEL);
2815         if (!snap_name)
2816                 goto out_err;
2817         memcpy(snap_name, buf, len);
2818         *(snap_name + len) = '\0';
2819
2820         return snap_name;
2821
2822 out_err:
2823         kfree(rbd_dev->image_name);
2824         rbd_dev->image_name = NULL;
2825         rbd_dev->image_name_len = 0;
2826         kfree(rbd_dev->pool_name);
2827         rbd_dev->pool_name = NULL;
2828
2829         return err_ptr;
2830 }
2831
2832 /*
2833  * An rbd format 2 image has a unique identifier, distinct from the
2834  * name given to it by the user.  Internally, that identifier is
2835  * what's used to specify the names of objects related to the image.
2836  *
2837  * A special "rbd id" object is used to map an rbd image name to its
2838  * id.  If that object doesn't exist, then there is no v2 rbd image
2839  * with the supplied name.
2840  *
2841  * This function will record the given rbd_dev's image_id field if
2842  * it can be determined, and in that case will return 0.  If any
2843  * errors occur a negative errno will be returned and the rbd_dev's
2844  * image_id field will be unchanged (and should be NULL).
2845  */
2846 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2847 {
2848         int ret;
2849         size_t size;
2850         char *object_name;
2851         void *response;
2852         void *p;
2853
2854         /*
2855          * First, see if the format 2 image id file exists, and if
2856          * so, get the image's persistent id from it.
2857          */
2858         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2859         object_name = kmalloc(size, GFP_NOIO);
2860         if (!object_name)
2861                 return -ENOMEM;
2862         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2863         dout("rbd id object name is %s\n", object_name);
2864
2865         /* Response will be an encoded string, which includes a length */
2866
2867         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2868         response = kzalloc(size, GFP_NOIO);
2869         if (!response) {
2870                 ret = -ENOMEM;
2871                 goto out;
2872         }
2873
2874         ret = rbd_req_sync_exec(rbd_dev, object_name,
2875                                 "rbd", "get_id",
2876                                 NULL, 0,
2877                                 response, RBD_IMAGE_ID_LEN_MAX,
2878                                 CEPH_OSD_FLAG_READ, NULL);
2879         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2880         if (ret < 0)
2881                 goto out;
2882         ret = 0;    /* rbd_req_sync_exec() can return positive */
2883
2884         p = response;
2885         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2886                                                 p + RBD_IMAGE_ID_LEN_MAX,
2887                                                 &rbd_dev->image_id_len,
2888                                                 GFP_NOIO);
2889         if (IS_ERR(rbd_dev->image_id)) {
2890                 ret = PTR_ERR(rbd_dev->image_id);
2891                 rbd_dev->image_id = NULL;
2892         } else {
2893                 dout("image_id is %s\n", rbd_dev->image_id);
2894         }
2895 out:
2896         kfree(response);
2897         kfree(object_name);
2898
2899         return ret;
2900 }
2901
2902 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2903 {
2904         int ret;
2905         size_t size;
2906
2907         /* Version 1 images have no id; empty string is used */
2908
2909         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2910         if (!rbd_dev->image_id)
2911                 return -ENOMEM;
2912         rbd_dev->image_id_len = 0;
2913
2914         /* Record the header object name for this rbd image. */
2915
2916         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2917         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2918         if (!rbd_dev->header_name) {
2919                 ret = -ENOMEM;
2920                 goto out_err;
2921         }
2922         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2923
2924         /* Populate rbd image metadata */
2925
2926         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2927         if (ret < 0)
2928                 goto out_err;
2929         rbd_dev->image_format = 1;
2930
2931         dout("discovered version 1 image, header name is %s\n",
2932                 rbd_dev->header_name);
2933
2934         return 0;
2935
2936 out_err:
2937         kfree(rbd_dev->header_name);
2938         rbd_dev->header_name = NULL;
2939         kfree(rbd_dev->image_id);
2940         rbd_dev->image_id = NULL;
2941
2942         return ret;
2943 }
2944
2945 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2946 {
2947         size_t size;
2948         int ret;
2949         u64 ver = 0;
2950
2951         /*
2952          * Image id was filled in by the caller.  Record the header
2953          * object name for this rbd image.
2954          */
2955         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2956         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2957         if (!rbd_dev->header_name)
2958                 return -ENOMEM;
2959         sprintf(rbd_dev->header_name, "%s%s",
2960                         RBD_HEADER_PREFIX, rbd_dev->image_id);
2961
2962         /* Get the size and object order for the image */
2963
2964         ret = rbd_dev_v2_image_size(rbd_dev);
2965         if (ret < 0)
2966                 goto out_err;
2967
2968         /* Get the object prefix (a.k.a. block_name) for the image */
2969
2970         ret = rbd_dev_v2_object_prefix(rbd_dev);
2971         if (ret < 0)
2972                 goto out_err;
2973
2974         /* Get the and check features for the image */
2975
2976         ret = rbd_dev_v2_features(rbd_dev);
2977         if (ret < 0)
2978                 goto out_err;
2979
2980         /* crypto and compression type aren't (yet) supported for v2 images */
2981
2982         rbd_dev->header.crypt_type = 0;
2983         rbd_dev->header.comp_type = 0;
2984
2985         /* Get the snapshot context, plus the header version */
2986
2987         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2988         if (ret)
2989                 goto out_err;
2990         rbd_dev->header.obj_version = ver;
2991
2992         rbd_dev->image_format = 2;
2993
2994         dout("discovered version 2 image, header name is %s\n",
2995                 rbd_dev->header_name);
2996
2997         return 0;
2998 out_err:
2999         kfree(rbd_dev->header_name);
3000         rbd_dev->header_name = NULL;
3001         kfree(rbd_dev->header.object_prefix);
3002         rbd_dev->header.object_prefix = NULL;
3003
3004         return ret;
3005 }
3006
3007 /*
3008  * Probe for the existence of the header object for the given rbd
3009  * device.  For format 2 images this includes determining the image
3010  * id.
3011  */
3012 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3013 {
3014         int ret;
3015
3016         /*
3017          * Get the id from the image id object.  If it's not a
3018          * format 2 image, we'll get ENOENT back, and we'll assume
3019          * it's a format 1 image.
3020          */
3021         ret = rbd_dev_image_id(rbd_dev);
3022         if (ret)
3023                 ret = rbd_dev_v1_probe(rbd_dev);
3024         else
3025                 ret = rbd_dev_v2_probe(rbd_dev);
3026         if (ret)
3027                 dout("probe failed, returning %d\n", ret);
3028
3029         return ret;
3030 }
3031
3032 static ssize_t rbd_add(struct bus_type *bus,
3033                        const char *buf,
3034                        size_t count)
3035 {
3036         char *options;
3037         struct rbd_device *rbd_dev = NULL;
3038         const char *mon_addrs = NULL;
3039         size_t mon_addrs_size = 0;
3040         struct ceph_osd_client *osdc;
3041         int rc = -ENOMEM;
3042         char *snap_name;
3043
3044         if (!try_module_get(THIS_MODULE))
3045                 return -ENODEV;
3046
3047         options = kmalloc(count, GFP_KERNEL);
3048         if (!options)
3049                 goto err_out_mem;
3050         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3051         if (!rbd_dev)
3052                 goto err_out_mem;
3053
3054         /* static rbd_device initialization */
3055         spin_lock_init(&rbd_dev->lock);
3056         INIT_LIST_HEAD(&rbd_dev->node);
3057         INIT_LIST_HEAD(&rbd_dev->snaps);
3058         init_rwsem(&rbd_dev->header_rwsem);
3059
3060         /* parse add command */
3061         snap_name = rbd_add_parse_args(rbd_dev, buf,
3062                                 &mon_addrs, &mon_addrs_size, options, count);
3063         if (IS_ERR(snap_name)) {
3064                 rc = PTR_ERR(snap_name);
3065                 goto err_out_mem;
3066         }
3067
3068         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3069         if (rc < 0)
3070                 goto err_out_args;
3071
3072         /* pick the pool */
3073         osdc = &rbd_dev->rbd_client->client->osdc;
3074         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3075         if (rc < 0)
3076                 goto err_out_client;
3077         rbd_dev->pool_id = rc;
3078
3079         rc = rbd_dev_probe(rbd_dev);
3080         if (rc < 0)
3081                 goto err_out_client;
3082
3083         /* no need to lock here, as rbd_dev is not registered yet */
3084         rc = rbd_dev_snaps_update(rbd_dev);
3085         if (rc)
3086                 goto err_out_header;
3087
3088         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3089         if (rc)
3090                 goto err_out_header;
3091
3092         /* generate unique id: find highest unique id, add one */
3093         rbd_dev_id_get(rbd_dev);
3094
3095         /* Fill in the device name, now that we have its id. */
3096         BUILD_BUG_ON(DEV_NAME_LEN
3097                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3098         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3099
3100         /* Get our block major device number. */
3101
3102         rc = register_blkdev(0, rbd_dev->name);
3103         if (rc < 0)
3104                 goto err_out_id;
3105         rbd_dev->major = rc;
3106
3107         /* Set up the blkdev mapping. */
3108
3109         rc = rbd_init_disk(rbd_dev);
3110         if (rc)
3111                 goto err_out_blkdev;
3112
3113         rc = rbd_bus_add_dev(rbd_dev);
3114         if (rc)
3115                 goto err_out_disk;
3116
3117         /*
3118          * At this point cleanup in the event of an error is the job
3119          * of the sysfs code (initiated by rbd_bus_del_dev()).
3120          */
3121
3122         down_write(&rbd_dev->header_rwsem);
3123         rc = rbd_dev_snaps_register(rbd_dev);
3124         up_write(&rbd_dev->header_rwsem);
3125         if (rc)
3126                 goto err_out_bus;
3127
3128         rc = rbd_init_watch_dev(rbd_dev);
3129         if (rc)
3130                 goto err_out_bus;
3131
3132         /* Everything's ready.  Announce the disk to the world. */
3133
3134         add_disk(rbd_dev->disk);
3135
3136         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3137                 (unsigned long long) rbd_dev->mapping.size);
3138
3139         return count;
3140
3141 err_out_bus:
3142         /* this will also clean up rest of rbd_dev stuff */
3143
3144         rbd_bus_del_dev(rbd_dev);
3145         kfree(options);
3146         return rc;
3147
3148 err_out_disk:
3149         rbd_free_disk(rbd_dev);
3150 err_out_blkdev:
3151         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3152 err_out_id:
3153         rbd_dev_id_put(rbd_dev);
3154 err_out_header:
3155         rbd_header_free(&rbd_dev->header);
3156 err_out_client:
3157         kfree(rbd_dev->header_name);
3158         rbd_put_client(rbd_dev);
3159         kfree(rbd_dev->image_id);
3160 err_out_args:
3161         kfree(rbd_dev->mapping.snap_name);
3162         kfree(rbd_dev->image_name);
3163         kfree(rbd_dev->pool_name);
3164 err_out_mem:
3165         kfree(rbd_dev);
3166         kfree(options);
3167
3168         dout("Error adding device %s\n", buf);
3169         module_put(THIS_MODULE);
3170
3171         return (ssize_t) rc;
3172 }
3173
3174 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3175 {
3176         struct list_head *tmp;
3177         struct rbd_device *rbd_dev;
3178
3179         spin_lock(&rbd_dev_list_lock);
3180         list_for_each(tmp, &rbd_dev_list) {
3181                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3182                 if (rbd_dev->dev_id == dev_id) {
3183                         spin_unlock(&rbd_dev_list_lock);
3184                         return rbd_dev;
3185                 }
3186         }
3187         spin_unlock(&rbd_dev_list_lock);
3188         return NULL;
3189 }
3190
3191 static void rbd_dev_release(struct device *dev)
3192 {
3193         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3194
3195         if (rbd_dev->watch_request) {
3196                 struct ceph_client *client = rbd_dev->rbd_client->client;
3197
3198                 ceph_osdc_unregister_linger_request(&client->osdc,
3199                                                     rbd_dev->watch_request);
3200         }
3201         if (rbd_dev->watch_event)
3202                 rbd_req_sync_unwatch(rbd_dev);
3203
3204         rbd_put_client(rbd_dev);
3205
3206         /* clean up and free blkdev */
3207         rbd_free_disk(rbd_dev);
3208         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3209
3210         /* release allocated disk header fields */
3211         rbd_header_free(&rbd_dev->header);
3212
3213         /* done with the id, and with the rbd_dev */
3214         kfree(rbd_dev->mapping.snap_name);
3215         kfree(rbd_dev->image_id);
3216         kfree(rbd_dev->header_name);
3217         kfree(rbd_dev->pool_name);
3218         kfree(rbd_dev->image_name);
3219         rbd_dev_id_put(rbd_dev);
3220         kfree(rbd_dev);
3221
3222         /* release module ref */
3223         module_put(THIS_MODULE);
3224 }
3225
3226 static ssize_t rbd_remove(struct bus_type *bus,
3227                           const char *buf,
3228                           size_t count)
3229 {
3230         struct rbd_device *rbd_dev = NULL;
3231         int target_id, rc;
3232         unsigned long ul;
3233         int ret = count;
3234
3235         rc = strict_strtoul(buf, 10, &ul);
3236         if (rc)
3237                 return rc;
3238
3239         /* convert to int; abort if we lost anything in the conversion */
3240         target_id = (int) ul;
3241         if (target_id != ul)
3242                 return -EINVAL;
3243
3244         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3245
3246         rbd_dev = __rbd_get_dev(target_id);
3247         if (!rbd_dev) {
3248                 ret = -ENOENT;
3249                 goto done;
3250         }
3251
3252         __rbd_remove_all_snaps(rbd_dev);
3253         rbd_bus_del_dev(rbd_dev);
3254
3255 done:
3256         mutex_unlock(&ctl_mutex);
3257
3258         return ret;
3259 }
3260
3261 /*
3262  * create control files in sysfs
3263  * /sys/bus/rbd/...
3264  */
3265 static int rbd_sysfs_init(void)
3266 {
3267         int ret;
3268
3269         ret = device_register(&rbd_root_dev);
3270         if (ret < 0)
3271                 return ret;
3272
3273         ret = bus_register(&rbd_bus_type);
3274         if (ret < 0)
3275                 device_unregister(&rbd_root_dev);
3276
3277         return ret;
3278 }
3279
3280 static void rbd_sysfs_cleanup(void)
3281 {
3282         bus_unregister(&rbd_bus_type);
3283         device_unregister(&rbd_root_dev);
3284 }
3285
3286 int __init rbd_init(void)
3287 {
3288         int rc;
3289
3290         rc = rbd_sysfs_init();
3291         if (rc)
3292                 return rc;
3293         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3294         return 0;
3295 }
3296
3297 void __exit rbd_exit(void)
3298 {
3299         rbd_sysfs_cleanup();
3300 }
3301
3302 module_init(rbd_init);
3303 module_exit(rbd_exit);
3304
3305 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3306 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3307 MODULE_DESCRIPTION("rados block device");
3308
3309 /* following authorship retained from original osdblk.c */
3310 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3311
3312 MODULE_LICENSE("GPL");