]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: drop rbd_do_op() opcode and flags
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN         1024
67
68 #define RBD_SNAP_HEAD_NAME      "-"
69
70 #define RBD_IMAGE_ID_LEN_MAX    64
71 #define RBD_OBJ_PREFIX_LEN_MAX  64
72
73 /* Feature bits */
74
75 #define RBD_FEATURE_LAYERING      1
76
77 /* Features supported by this (client software) implementation. */
78
79 #define RBD_FEATURES_ALL          (0)
80
81 /*
82  * An RBD device name will be "rbd#", where the "rbd" comes from
83  * RBD_DRV_NAME above, and # is a unique integer identifier.
84  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
85  * enough to hold all possible device names.
86  */
87 #define DEV_NAME_LEN            32
88 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
89
90 #define RBD_READ_ONLY_DEFAULT           false
91
92 /*
93  * block device image metadata (in-memory version)
94  */
95 struct rbd_image_header {
96         /* These four fields never change for a given rbd image */
97         char *object_prefix;
98         u64 features;
99         __u8 obj_order;
100         __u8 crypt_type;
101         __u8 comp_type;
102
103         /* The remaining fields need to be updated occasionally */
104         u64 image_size;
105         struct ceph_snap_context *snapc;
106         char *snap_names;
107         u64 *snap_sizes;
108
109         u64 obj_version;
110 };
111
112 struct rbd_options {
113         bool    read_only;
114 };
115
116 /*
117  * an instance of the client.  multiple devices may share an rbd client.
118  */
119 struct rbd_client {
120         struct ceph_client      *client;
121         struct kref             kref;
122         struct list_head        node;
123 };
124
125 /*
126  * a request completion status
127  */
128 struct rbd_req_status {
129         int done;
130         int rc;
131         u64 bytes;
132 };
133
134 /*
135  * a collection of requests
136  */
137 struct rbd_req_coll {
138         int                     total;
139         int                     num_done;
140         struct kref             kref;
141         struct rbd_req_status   status[0];
142 };
143
144 /*
145  * a single io request
146  */
147 struct rbd_request {
148         struct request          *rq;            /* blk layer request */
149         struct bio              *bio;           /* cloned bio */
150         struct page             **pages;        /* list of used pages */
151         u64                     len;
152         int                     coll_index;
153         struct rbd_req_coll     *coll;
154 };
155
156 struct rbd_snap {
157         struct  device          dev;
158         const char              *name;
159         u64                     size;
160         struct list_head        node;
161         u64                     id;
162         u64                     features;
163 };
164
165 struct rbd_mapping {
166         char                    *snap_name;
167         u64                     snap_id;
168         u64                     size;
169         u64                     features;
170         bool                    snap_exists;
171         bool                    read_only;
172 };
173
174 /*
175  * a single device
176  */
177 struct rbd_device {
178         int                     dev_id;         /* blkdev unique id */
179
180         int                     major;          /* blkdev assigned major */
181         struct gendisk          *disk;          /* blkdev's gendisk and rq */
182
183         u32                     image_format;   /* Either 1 or 2 */
184         struct rbd_options      rbd_opts;
185         struct rbd_client       *rbd_client;
186
187         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
188
189         spinlock_t              lock;           /* queue lock */
190
191         struct rbd_image_header header;
192         char                    *image_id;
193         size_t                  image_id_len;
194         char                    *image_name;
195         size_t                  image_name_len;
196         char                    *header_name;
197         char                    *pool_name;
198         int                     pool_id;
199
200         struct ceph_osd_event   *watch_event;
201         struct ceph_osd_request *watch_request;
202
203         /* protects updating the header */
204         struct rw_semaphore     header_rwsem;
205
206         struct rbd_mapping      mapping;
207
208         struct list_head        node;
209
210         /* list of snapshots */
211         struct list_head        snaps;
212
213         /* sysfs related */
214         struct device           dev;
215 };
216
217 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
218
219 static LIST_HEAD(rbd_dev_list);    /* devices */
220 static DEFINE_SPINLOCK(rbd_dev_list_lock);
221
222 static LIST_HEAD(rbd_client_list);              /* clients */
223 static DEFINE_SPINLOCK(rbd_client_list_lock);
224
225 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
226 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
227
228 static void rbd_dev_release(struct device *dev);
229 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
230
231 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
232                        size_t count);
233 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
234                           size_t count);
235
236 static struct bus_attribute rbd_bus_attrs[] = {
237         __ATTR(add, S_IWUSR, NULL, rbd_add),
238         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
239         __ATTR_NULL
240 };
241
242 static struct bus_type rbd_bus_type = {
243         .name           = "rbd",
244         .bus_attrs      = rbd_bus_attrs,
245 };
246
247 static void rbd_root_dev_release(struct device *dev)
248 {
249 }
250
251 static struct device rbd_root_dev = {
252         .init_name =    "rbd",
253         .release =      rbd_root_dev_release,
254 };
255
256 #ifdef RBD_DEBUG
257 #define rbd_assert(expr)                                                \
258                 if (unlikely(!(expr))) {                                \
259                         printk(KERN_ERR "\nAssertion failure in %s() "  \
260                                                 "at line %d:\n\n"       \
261                                         "\trbd_assert(%s);\n\n",        \
262                                         __func__, __LINE__, #expr);     \
263                         BUG();                                          \
264                 }
265 #else /* !RBD_DEBUG */
266 #  define rbd_assert(expr)      ((void) 0)
267 #endif /* !RBD_DEBUG */
268
269 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
270 {
271         return get_device(&rbd_dev->dev);
272 }
273
274 static void rbd_put_dev(struct rbd_device *rbd_dev)
275 {
276         put_device(&rbd_dev->dev);
277 }
278
279 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
280 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
281
282 static int rbd_open(struct block_device *bdev, fmode_t mode)
283 {
284         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
285
286         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
287                 return -EROFS;
288
289         rbd_get_dev(rbd_dev);
290         set_device_ro(bdev, rbd_dev->mapping.read_only);
291
292         return 0;
293 }
294
295 static int rbd_release(struct gendisk *disk, fmode_t mode)
296 {
297         struct rbd_device *rbd_dev = disk->private_data;
298
299         rbd_put_dev(rbd_dev);
300
301         return 0;
302 }
303
304 static const struct block_device_operations rbd_bd_ops = {
305         .owner                  = THIS_MODULE,
306         .open                   = rbd_open,
307         .release                = rbd_release,
308 };
309
310 /*
311  * Initialize an rbd client instance.
312  * We own *ceph_opts.
313  */
314 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
315 {
316         struct rbd_client *rbdc;
317         int ret = -ENOMEM;
318
319         dout("rbd_client_create\n");
320         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
321         if (!rbdc)
322                 goto out_opt;
323
324         kref_init(&rbdc->kref);
325         INIT_LIST_HEAD(&rbdc->node);
326
327         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
328
329         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
330         if (IS_ERR(rbdc->client))
331                 goto out_mutex;
332         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
333
334         ret = ceph_open_session(rbdc->client);
335         if (ret < 0)
336                 goto out_err;
337
338         spin_lock(&rbd_client_list_lock);
339         list_add_tail(&rbdc->node, &rbd_client_list);
340         spin_unlock(&rbd_client_list_lock);
341
342         mutex_unlock(&ctl_mutex);
343
344         dout("rbd_client_create created %p\n", rbdc);
345         return rbdc;
346
347 out_err:
348         ceph_destroy_client(rbdc->client);
349 out_mutex:
350         mutex_unlock(&ctl_mutex);
351         kfree(rbdc);
352 out_opt:
353         if (ceph_opts)
354                 ceph_destroy_options(ceph_opts);
355         return ERR_PTR(ret);
356 }
357
358 /*
359  * Find a ceph client with specific addr and configuration.  If
360  * found, bump its reference count.
361  */
362 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
363 {
364         struct rbd_client *client_node;
365         bool found = false;
366
367         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
368                 return NULL;
369
370         spin_lock(&rbd_client_list_lock);
371         list_for_each_entry(client_node, &rbd_client_list, node) {
372                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
373                         kref_get(&client_node->kref);
374                         found = true;
375                         break;
376                 }
377         }
378         spin_unlock(&rbd_client_list_lock);
379
380         return found ? client_node : NULL;
381 }
382
383 /*
384  * mount options
385  */
386 enum {
387         Opt_last_int,
388         /* int args above */
389         Opt_last_string,
390         /* string args above */
391         Opt_read_only,
392         Opt_read_write,
393         /* Boolean args above */
394         Opt_last_bool,
395 };
396
397 static match_table_t rbd_opts_tokens = {
398         /* int args above */
399         /* string args above */
400         {Opt_read_only, "read_only"},
401         {Opt_read_only, "ro"},          /* Alternate spelling */
402         {Opt_read_write, "read_write"},
403         {Opt_read_write, "rw"},         /* Alternate spelling */
404         /* Boolean args above */
405         {-1, NULL}
406 };
407
408 static int parse_rbd_opts_token(char *c, void *private)
409 {
410         struct rbd_options *rbd_opts = private;
411         substring_t argstr[MAX_OPT_ARGS];
412         int token, intval, ret;
413
414         token = match_token(c, rbd_opts_tokens, argstr);
415         if (token < 0)
416                 return -EINVAL;
417
418         if (token < Opt_last_int) {
419                 ret = match_int(&argstr[0], &intval);
420                 if (ret < 0) {
421                         pr_err("bad mount option arg (not int) "
422                                "at '%s'\n", c);
423                         return ret;
424                 }
425                 dout("got int token %d val %d\n", token, intval);
426         } else if (token > Opt_last_int && token < Opt_last_string) {
427                 dout("got string token %d val %s\n", token,
428                      argstr[0].from);
429         } else if (token > Opt_last_string && token < Opt_last_bool) {
430                 dout("got Boolean token %d\n", token);
431         } else {
432                 dout("got token %d\n", token);
433         }
434
435         switch (token) {
436         case Opt_read_only:
437                 rbd_opts->read_only = true;
438                 break;
439         case Opt_read_write:
440                 rbd_opts->read_only = false;
441                 break;
442         default:
443                 rbd_assert(false);
444                 break;
445         }
446         return 0;
447 }
448
449 /*
450  * Get a ceph client with specific addr and configuration, if one does
451  * not exist create it.
452  */
453 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
454                                 size_t mon_addr_len, char *options)
455 {
456         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
457         struct ceph_options *ceph_opts;
458         struct rbd_client *rbdc;
459
460         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
461
462         ceph_opts = ceph_parse_options(options, mon_addr,
463                                         mon_addr + mon_addr_len,
464                                         parse_rbd_opts_token, rbd_opts);
465         if (IS_ERR(ceph_opts))
466                 return PTR_ERR(ceph_opts);
467
468         rbdc = rbd_client_find(ceph_opts);
469         if (rbdc) {
470                 /* using an existing client */
471                 ceph_destroy_options(ceph_opts);
472         } else {
473                 rbdc = rbd_client_create(ceph_opts);
474                 if (IS_ERR(rbdc))
475                         return PTR_ERR(rbdc);
476         }
477         rbd_dev->rbd_client = rbdc;
478
479         return 0;
480 }
481
482 /*
483  * Destroy ceph client
484  *
485  * Caller must hold rbd_client_list_lock.
486  */
487 static void rbd_client_release(struct kref *kref)
488 {
489         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
490
491         dout("rbd_release_client %p\n", rbdc);
492         spin_lock(&rbd_client_list_lock);
493         list_del(&rbdc->node);
494         spin_unlock(&rbd_client_list_lock);
495
496         ceph_destroy_client(rbdc->client);
497         kfree(rbdc);
498 }
499
500 /*
501  * Drop reference to ceph client node. If it's not referenced anymore, release
502  * it.
503  */
504 static void rbd_put_client(struct rbd_device *rbd_dev)
505 {
506         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
507         rbd_dev->rbd_client = NULL;
508 }
509
510 /*
511  * Destroy requests collection
512  */
513 static void rbd_coll_release(struct kref *kref)
514 {
515         struct rbd_req_coll *coll =
516                 container_of(kref, struct rbd_req_coll, kref);
517
518         dout("rbd_coll_release %p\n", coll);
519         kfree(coll);
520 }
521
522 static bool rbd_image_format_valid(u32 image_format)
523 {
524         return image_format == 1 || image_format == 2;
525 }
526
527 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
528 {
529         size_t size;
530         u32 snap_count;
531
532         /* The header has to start with the magic rbd header text */
533         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
534                 return false;
535
536         /*
537          * The size of a snapshot header has to fit in a size_t, and
538          * that limits the number of snapshots.
539          */
540         snap_count = le32_to_cpu(ondisk->snap_count);
541         size = SIZE_MAX - sizeof (struct ceph_snap_context);
542         if (snap_count > size / sizeof (__le64))
543                 return false;
544
545         /*
546          * Not only that, but the size of the entire the snapshot
547          * header must also be representable in a size_t.
548          */
549         size -= snap_count * sizeof (__le64);
550         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
551                 return false;
552
553         return true;
554 }
555
556 /*
557  * Create a new header structure, translate header format from the on-disk
558  * header.
559  */
560 static int rbd_header_from_disk(struct rbd_image_header *header,
561                                  struct rbd_image_header_ondisk *ondisk)
562 {
563         u32 snap_count;
564         size_t len;
565         size_t size;
566         u32 i;
567
568         memset(header, 0, sizeof (*header));
569
570         snap_count = le32_to_cpu(ondisk->snap_count);
571
572         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
573         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
574         if (!header->object_prefix)
575                 return -ENOMEM;
576         memcpy(header->object_prefix, ondisk->object_prefix, len);
577         header->object_prefix[len] = '\0';
578
579         if (snap_count) {
580                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
581
582                 /* Save a copy of the snapshot names */
583
584                 if (snap_names_len > (u64) SIZE_MAX)
585                         return -EIO;
586                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
587                 if (!header->snap_names)
588                         goto out_err;
589                 /*
590                  * Note that rbd_dev_v1_header_read() guarantees
591                  * the ondisk buffer we're working with has
592                  * snap_names_len bytes beyond the end of the
593                  * snapshot id array, this memcpy() is safe.
594                  */
595                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
596                         snap_names_len);
597
598                 /* Record each snapshot's size */
599
600                 size = snap_count * sizeof (*header->snap_sizes);
601                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602                 if (!header->snap_sizes)
603                         goto out_err;
604                 for (i = 0; i < snap_count; i++)
605                         header->snap_sizes[i] =
606                                 le64_to_cpu(ondisk->snaps[i].image_size);
607         } else {
608                 WARN_ON(ondisk->snap_names_len);
609                 header->snap_names = NULL;
610                 header->snap_sizes = NULL;
611         }
612
613         header->features = 0;   /* No features support in v1 images */
614         header->obj_order = ondisk->options.order;
615         header->crypt_type = ondisk->options.crypt_type;
616         header->comp_type = ondisk->options.comp_type;
617
618         /* Allocate and fill in the snapshot context */
619
620         header->image_size = le64_to_cpu(ondisk->image_size);
621         size = sizeof (struct ceph_snap_context);
622         size += snap_count * sizeof (header->snapc->snaps[0]);
623         header->snapc = kzalloc(size, GFP_KERNEL);
624         if (!header->snapc)
625                 goto out_err;
626
627         atomic_set(&header->snapc->nref, 1);
628         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
629         header->snapc->num_snaps = snap_count;
630         for (i = 0; i < snap_count; i++)
631                 header->snapc->snaps[i] =
632                         le64_to_cpu(ondisk->snaps[i].id);
633
634         return 0;
635
636 out_err:
637         kfree(header->snap_sizes);
638         header->snap_sizes = NULL;
639         kfree(header->snap_names);
640         header->snap_names = NULL;
641         kfree(header->object_prefix);
642         header->object_prefix = NULL;
643
644         return -ENOMEM;
645 }
646
647 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
648 {
649
650         struct rbd_snap *snap;
651
652         list_for_each_entry(snap, &rbd_dev->snaps, node) {
653                 if (!strcmp(snap_name, snap->name)) {
654                         rbd_dev->mapping.snap_id = snap->id;
655                         rbd_dev->mapping.size = snap->size;
656                         rbd_dev->mapping.features = snap->features;
657
658                         return 0;
659                 }
660         }
661
662         return -ENOENT;
663 }
664
665 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
666 {
667         int ret;
668
669         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
670                     sizeof (RBD_SNAP_HEAD_NAME))) {
671                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
672                 rbd_dev->mapping.size = rbd_dev->header.image_size;
673                 rbd_dev->mapping.features = rbd_dev->header.features;
674                 rbd_dev->mapping.snap_exists = false;
675                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
676                 ret = 0;
677         } else {
678                 ret = snap_by_name(rbd_dev, snap_name);
679                 if (ret < 0)
680                         goto done;
681                 rbd_dev->mapping.snap_exists = true;
682                 rbd_dev->mapping.read_only = true;
683         }
684         rbd_dev->mapping.snap_name = snap_name;
685 done:
686         return ret;
687 }
688
689 static void rbd_header_free(struct rbd_image_header *header)
690 {
691         kfree(header->object_prefix);
692         header->object_prefix = NULL;
693         kfree(header->snap_sizes);
694         header->snap_sizes = NULL;
695         kfree(header->snap_names);
696         header->snap_names = NULL;
697         ceph_put_snap_context(header->snapc);
698         header->snapc = NULL;
699 }
700
701 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
702 {
703         char *name;
704         u64 segment;
705         int ret;
706
707         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
708         if (!name)
709                 return NULL;
710         segment = offset >> rbd_dev->header.obj_order;
711         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
712                         rbd_dev->header.object_prefix, segment);
713         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
714                 pr_err("error formatting segment name for #%llu (%d)\n",
715                         segment, ret);
716                 kfree(name);
717                 name = NULL;
718         }
719
720         return name;
721 }
722
723 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
724 {
725         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
726
727         return offset & (segment_size - 1);
728 }
729
730 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
731                                 u64 offset, u64 length)
732 {
733         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
734
735         offset &= segment_size - 1;
736
737         rbd_assert(length <= U64_MAX - offset);
738         if (offset + length > segment_size)
739                 length = segment_size - offset;
740
741         return length;
742 }
743
744 static int rbd_get_num_segments(struct rbd_image_header *header,
745                                 u64 ofs, u64 len)
746 {
747         u64 start_seg;
748         u64 end_seg;
749
750         if (!len)
751                 return 0;
752         if (len - 1 > U64_MAX - ofs)
753                 return -ERANGE;
754
755         start_seg = ofs >> header->obj_order;
756         end_seg = (ofs + len - 1) >> header->obj_order;
757
758         return end_seg - start_seg + 1;
759 }
760
761 /*
762  * returns the size of an object in the image
763  */
764 static u64 rbd_obj_bytes(struct rbd_image_header *header)
765 {
766         return 1 << header->obj_order;
767 }
768
769 /*
770  * bio helpers
771  */
772
773 static void bio_chain_put(struct bio *chain)
774 {
775         struct bio *tmp;
776
777         while (chain) {
778                 tmp = chain;
779                 chain = chain->bi_next;
780                 bio_put(tmp);
781         }
782 }
783
784 /*
785  * zeros a bio chain, starting at specific offset
786  */
787 static void zero_bio_chain(struct bio *chain, int start_ofs)
788 {
789         struct bio_vec *bv;
790         unsigned long flags;
791         void *buf;
792         int i;
793         int pos = 0;
794
795         while (chain) {
796                 bio_for_each_segment(bv, chain, i) {
797                         if (pos + bv->bv_len > start_ofs) {
798                                 int remainder = max(start_ofs - pos, 0);
799                                 buf = bvec_kmap_irq(bv, &flags);
800                                 memset(buf + remainder, 0,
801                                        bv->bv_len - remainder);
802                                 bvec_kunmap_irq(buf, &flags);
803                         }
804                         pos += bv->bv_len;
805                 }
806
807                 chain = chain->bi_next;
808         }
809 }
810
811 /*
812  * bio_chain_clone - clone a chain of bios up to a certain length.
813  * might return a bio_pair that will need to be released.
814  */
815 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
816                                    struct bio_pair **bp,
817                                    int len, gfp_t gfpmask)
818 {
819         struct bio *old_chain = *old;
820         struct bio *new_chain = NULL;
821         struct bio *tail;
822         int total = 0;
823
824         if (*bp) {
825                 bio_pair_release(*bp);
826                 *bp = NULL;
827         }
828
829         while (old_chain && (total < len)) {
830                 struct bio *tmp;
831
832                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
833                 if (!tmp)
834                         goto err_out;
835                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
836
837                 if (total + old_chain->bi_size > len) {
838                         struct bio_pair *bp;
839
840                         /*
841                          * this split can only happen with a single paged bio,
842                          * split_bio will BUG_ON if this is not the case
843                          */
844                         dout("bio_chain_clone split! total=%d remaining=%d"
845                              "bi_size=%u\n",
846                              total, len - total, old_chain->bi_size);
847
848                         /* split the bio. We'll release it either in the next
849                            call, or it will have to be released outside */
850                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
851                         if (!bp)
852                                 goto err_out;
853
854                         __bio_clone(tmp, &bp->bio1);
855
856                         *next = &bp->bio2;
857                 } else {
858                         __bio_clone(tmp, old_chain);
859                         *next = old_chain->bi_next;
860                 }
861
862                 tmp->bi_bdev = NULL;
863                 tmp->bi_next = NULL;
864                 if (new_chain)
865                         tail->bi_next = tmp;
866                 else
867                         new_chain = tmp;
868                 tail = tmp;
869                 old_chain = old_chain->bi_next;
870
871                 total += tmp->bi_size;
872         }
873
874         rbd_assert(total == len);
875
876         *old = old_chain;
877
878         return new_chain;
879
880 err_out:
881         dout("bio_chain_clone with err\n");
882         bio_chain_put(new_chain);
883         return NULL;
884 }
885
886 /*
887  * helpers for osd request op vectors.
888  */
889 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
890                                         int opcode, u32 payload_len)
891 {
892         struct ceph_osd_req_op *ops;
893
894         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
895         if (!ops)
896                 return NULL;
897
898         ops[0].op = opcode;
899
900         /*
901          * op extent offset and length will be set later on
902          * in calc_raw_layout()
903          */
904         ops[0].payload_len = payload_len;
905
906         return ops;
907 }
908
909 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
910 {
911         kfree(ops);
912 }
913
914 static void rbd_coll_end_req_index(struct request *rq,
915                                    struct rbd_req_coll *coll,
916                                    int index,
917                                    int ret, u64 len)
918 {
919         struct request_queue *q;
920         int min, max, i;
921
922         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
923              coll, index, ret, (unsigned long long) len);
924
925         if (!rq)
926                 return;
927
928         if (!coll) {
929                 blk_end_request(rq, ret, len);
930                 return;
931         }
932
933         q = rq->q;
934
935         spin_lock_irq(q->queue_lock);
936         coll->status[index].done = 1;
937         coll->status[index].rc = ret;
938         coll->status[index].bytes = len;
939         max = min = coll->num_done;
940         while (max < coll->total && coll->status[max].done)
941                 max++;
942
943         for (i = min; i<max; i++) {
944                 __blk_end_request(rq, coll->status[i].rc,
945                                   coll->status[i].bytes);
946                 coll->num_done++;
947                 kref_put(&coll->kref, rbd_coll_release);
948         }
949         spin_unlock_irq(q->queue_lock);
950 }
951
952 static void rbd_coll_end_req(struct rbd_request *req,
953                              int ret, u64 len)
954 {
955         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
956 }
957
958 /*
959  * Send ceph osd request
960  */
961 static int rbd_do_request(struct request *rq,
962                           struct rbd_device *rbd_dev,
963                           struct ceph_snap_context *snapc,
964                           u64 snapid,
965                           const char *object_name, u64 ofs, u64 len,
966                           struct bio *bio,
967                           struct page **pages,
968                           int num_pages,
969                           int flags,
970                           struct ceph_osd_req_op *ops,
971                           struct rbd_req_coll *coll,
972                           int coll_index,
973                           void (*rbd_cb)(struct ceph_osd_request *req,
974                                          struct ceph_msg *msg),
975                           struct ceph_osd_request **linger_req,
976                           u64 *ver)
977 {
978         struct ceph_osd_request *req;
979         struct ceph_file_layout *layout;
980         int ret;
981         u64 bno;
982         struct timespec mtime = CURRENT_TIME;
983         struct rbd_request *req_data;
984         struct ceph_osd_request_head *reqhead;
985         struct ceph_osd_client *osdc;
986
987         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
988         if (!req_data) {
989                 if (coll)
990                         rbd_coll_end_req_index(rq, coll, coll_index,
991                                                -ENOMEM, len);
992                 return -ENOMEM;
993         }
994
995         if (coll) {
996                 req_data->coll = coll;
997                 req_data->coll_index = coll_index;
998         }
999
1000         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
1001                 (unsigned long long) ofs, (unsigned long long) len);
1002
1003         osdc = &rbd_dev->rbd_client->client->osdc;
1004         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1005                                         false, GFP_NOIO, pages, bio);
1006         if (!req) {
1007                 ret = -ENOMEM;
1008                 goto done_pages;
1009         }
1010
1011         req->r_callback = rbd_cb;
1012
1013         req_data->rq = rq;
1014         req_data->bio = bio;
1015         req_data->pages = pages;
1016         req_data->len = len;
1017
1018         req->r_priv = req_data;
1019
1020         reqhead = req->r_request->front.iov_base;
1021         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1022
1023         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1024         req->r_oid_len = strlen(req->r_oid);
1025
1026         layout = &req->r_file_layout;
1027         memset(layout, 0, sizeof(*layout));
1028         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1029         layout->fl_stripe_count = cpu_to_le32(1);
1030         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1031         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1032         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1033                                    req, ops);
1034         rbd_assert(ret == 0);
1035
1036         ceph_osdc_build_request(req, ofs, &len,
1037                                 ops,
1038                                 snapc,
1039                                 &mtime,
1040                                 req->r_oid, req->r_oid_len);
1041
1042         if (linger_req) {
1043                 ceph_osdc_set_request_linger(osdc, req);
1044                 *linger_req = req;
1045         }
1046
1047         ret = ceph_osdc_start_request(osdc, req, false);
1048         if (ret < 0)
1049                 goto done_err;
1050
1051         if (!rbd_cb) {
1052                 ret = ceph_osdc_wait_request(osdc, req);
1053                 if (ver)
1054                         *ver = le64_to_cpu(req->r_reassert_version.version);
1055                 dout("reassert_ver=%llu\n",
1056                         (unsigned long long)
1057                                 le64_to_cpu(req->r_reassert_version.version));
1058                 ceph_osdc_put_request(req);
1059         }
1060         return ret;
1061
1062 done_err:
1063         bio_chain_put(req_data->bio);
1064         ceph_osdc_put_request(req);
1065 done_pages:
1066         rbd_coll_end_req(req_data, ret, len);
1067         kfree(req_data);
1068         return ret;
1069 }
1070
1071 /*
1072  * Ceph osd op callback
1073  */
1074 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1075 {
1076         struct rbd_request *req_data = req->r_priv;
1077         struct ceph_osd_reply_head *replyhead;
1078         struct ceph_osd_op *op;
1079         __s32 rc;
1080         u64 bytes;
1081         int read_op;
1082
1083         /* parse reply */
1084         replyhead = msg->front.iov_base;
1085         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1086         op = (void *)(replyhead + 1);
1087         rc = le32_to_cpu(replyhead->result);
1088         bytes = le64_to_cpu(op->extent.length);
1089         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1090
1091         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1092                 (unsigned long long) bytes, read_op, (int) rc);
1093
1094         if (rc == -ENOENT && read_op) {
1095                 zero_bio_chain(req_data->bio, 0);
1096                 rc = 0;
1097         } else if (rc == 0 && read_op && bytes < req_data->len) {
1098                 zero_bio_chain(req_data->bio, bytes);
1099                 bytes = req_data->len;
1100         }
1101
1102         rbd_coll_end_req(req_data, rc, bytes);
1103
1104         if (req_data->bio)
1105                 bio_chain_put(req_data->bio);
1106
1107         ceph_osdc_put_request(req);
1108         kfree(req_data);
1109 }
1110
1111 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1112 {
1113         ceph_osdc_put_request(req);
1114 }
1115
1116 /*
1117  * Do a synchronous ceph osd operation
1118  */
1119 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1120                            struct ceph_snap_context *snapc,
1121                            u64 snapid,
1122                            int flags,
1123                            struct ceph_osd_req_op *ops,
1124                            const char *object_name,
1125                            u64 ofs, u64 inbound_size,
1126                            char *inbound,
1127                            struct ceph_osd_request **linger_req,
1128                            u64 *ver)
1129 {
1130         int ret;
1131         struct page **pages;
1132         int num_pages;
1133
1134         rbd_assert(ops != NULL);
1135
1136         num_pages = calc_pages_for(ofs, inbound_size);
1137         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1138         if (IS_ERR(pages))
1139                 return PTR_ERR(pages);
1140
1141         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1142                           object_name, ofs, inbound_size, NULL,
1143                           pages, num_pages,
1144                           flags,
1145                           ops,
1146                           NULL, 0,
1147                           NULL,
1148                           linger_req, ver);
1149         if (ret < 0)
1150                 goto done;
1151
1152         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1153                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1154
1155 done:
1156         ceph_release_page_vector(pages, num_pages);
1157         return ret;
1158 }
1159
1160 /*
1161  * Do an asynchronous ceph osd operation
1162  */
1163 static int rbd_do_op(struct request *rq,
1164                      struct rbd_device *rbd_dev,
1165                      struct ceph_snap_context *snapc,
1166                      u64 snapid,
1167                      u64 ofs, u64 len,
1168                      struct bio *bio,
1169                      struct rbd_req_coll *coll,
1170                      int coll_index)
1171 {
1172         char *seg_name;
1173         u64 seg_ofs;
1174         u64 seg_len;
1175         int ret;
1176         struct ceph_osd_req_op *ops;
1177         u32 payload_len;
1178         int opcode;
1179         int flags;
1180
1181         seg_name = rbd_segment_name(rbd_dev, ofs);
1182         if (!seg_name)
1183                 return -ENOMEM;
1184         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1185         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1186
1187         if (rq_data_dir(rq) == WRITE) {
1188                 opcode = CEPH_OSD_OP_WRITE;
1189                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1190                 payload_len = seg_len;
1191         } else {
1192                 opcode = CEPH_OSD_OP_READ;
1193                 flags = CEPH_OSD_FLAG_READ;
1194                 payload_len = 0;
1195         }
1196
1197         ret = -ENOMEM;
1198         ops = rbd_create_rw_ops(1, opcode, payload_len);
1199         if (!ops)
1200                 goto done;
1201
1202         /* we've taken care of segment sizes earlier when we
1203            cloned the bios. We should never have a segment
1204            truncated at this point */
1205         rbd_assert(seg_len == len);
1206
1207         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1208                              seg_name, seg_ofs, seg_len,
1209                              bio,
1210                              NULL, 0,
1211                              flags,
1212                              ops,
1213                              coll, coll_index,
1214                              rbd_req_cb, 0, NULL);
1215
1216         rbd_destroy_ops(ops);
1217 done:
1218         kfree(seg_name);
1219         return ret;
1220 }
1221
1222 /*
1223  * Request sync osd read
1224  */
1225 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1226                           u64 snapid,
1227                           const char *object_name,
1228                           u64 ofs, u64 len,
1229                           char *buf,
1230                           u64 *ver)
1231 {
1232         struct ceph_osd_req_op *ops;
1233         int ret;
1234
1235         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1236         if (!ops)
1237                 return -ENOMEM;
1238
1239         ret = rbd_req_sync_op(rbd_dev, NULL,
1240                                snapid,
1241                                CEPH_OSD_FLAG_READ,
1242                                ops, object_name, ofs, len, buf, NULL, ver);
1243         rbd_destroy_ops(ops);
1244
1245         return ret;
1246 }
1247
1248 /*
1249  * Request sync osd watch
1250  */
1251 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1252                                    u64 ver,
1253                                    u64 notify_id)
1254 {
1255         struct ceph_osd_req_op *ops;
1256         int ret;
1257
1258         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1259         if (!ops)
1260                 return -ENOMEM;
1261
1262         ops[0].watch.ver = cpu_to_le64(ver);
1263         ops[0].watch.cookie = notify_id;
1264         ops[0].watch.flag = 0;
1265
1266         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1267                           rbd_dev->header_name, 0, 0, NULL,
1268                           NULL, 0,
1269                           CEPH_OSD_FLAG_READ,
1270                           ops,
1271                           NULL, 0,
1272                           rbd_simple_req_cb, 0, NULL);
1273
1274         rbd_destroy_ops(ops);
1275         return ret;
1276 }
1277
1278 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1279 {
1280         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1281         u64 hver;
1282         int rc;
1283
1284         if (!rbd_dev)
1285                 return;
1286
1287         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1288                 rbd_dev->header_name, (unsigned long long) notify_id,
1289                 (unsigned int) opcode);
1290         rc = rbd_dev_refresh(rbd_dev, &hver);
1291         if (rc)
1292                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1293                            " update snaps: %d\n", rbd_dev->major, rc);
1294
1295         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1296 }
1297
1298 /*
1299  * Request sync osd watch
1300  */
1301 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1302 {
1303         struct ceph_osd_req_op *ops;
1304         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1305         int ret;
1306
1307         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1308         if (!ops)
1309                 return -ENOMEM;
1310
1311         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1312                                      (void *)rbd_dev, &rbd_dev->watch_event);
1313         if (ret < 0)
1314                 goto fail;
1315
1316         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1317         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1318         ops[0].watch.flag = 1;
1319
1320         ret = rbd_req_sync_op(rbd_dev, NULL,
1321                               CEPH_NOSNAP,
1322                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1323                               ops,
1324                               rbd_dev->header_name,
1325                               0, 0, NULL,
1326                               &rbd_dev->watch_request, NULL);
1327
1328         if (ret < 0)
1329                 goto fail_event;
1330
1331         rbd_destroy_ops(ops);
1332         return 0;
1333
1334 fail_event:
1335         ceph_osdc_cancel_event(rbd_dev->watch_event);
1336         rbd_dev->watch_event = NULL;
1337 fail:
1338         rbd_destroy_ops(ops);
1339         return ret;
1340 }
1341
1342 /*
1343  * Request sync osd unwatch
1344  */
1345 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1346 {
1347         struct ceph_osd_req_op *ops;
1348         int ret;
1349
1350         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1351         if (!ops)
1352                 return -ENOMEM;
1353
1354         ops[0].watch.ver = 0;
1355         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1356         ops[0].watch.flag = 0;
1357
1358         ret = rbd_req_sync_op(rbd_dev, NULL,
1359                               CEPH_NOSNAP,
1360                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1361                               ops,
1362                               rbd_dev->header_name,
1363                               0, 0, NULL, NULL, NULL);
1364
1365
1366         rbd_destroy_ops(ops);
1367         ceph_osdc_cancel_event(rbd_dev->watch_event);
1368         rbd_dev->watch_event = NULL;
1369         return ret;
1370 }
1371
1372 /*
1373  * Synchronous osd object method call
1374  */
1375 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1376                              const char *object_name,
1377                              const char *class_name,
1378                              const char *method_name,
1379                              const char *outbound,
1380                              size_t outbound_size,
1381                              char *inbound,
1382                              size_t inbound_size,
1383                              int flags,
1384                              u64 *ver)
1385 {
1386         struct ceph_osd_req_op *ops;
1387         int class_name_len = strlen(class_name);
1388         int method_name_len = strlen(method_name);
1389         int payload_size;
1390         int ret;
1391
1392         /*
1393          * Any input parameters required by the method we're calling
1394          * will be sent along with the class and method names as
1395          * part of the message payload.  That data and its size are
1396          * supplied via the indata and indata_len fields (named from
1397          * the perspective of the server side) in the OSD request
1398          * operation.
1399          */
1400         payload_size = class_name_len + method_name_len + outbound_size;
1401         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1402         if (!ops)
1403                 return -ENOMEM;
1404
1405         ops[0].cls.class_name = class_name;
1406         ops[0].cls.class_len = (__u8) class_name_len;
1407         ops[0].cls.method_name = method_name;
1408         ops[0].cls.method_len = (__u8) method_name_len;
1409         ops[0].cls.argc = 0;
1410         ops[0].cls.indata = outbound;
1411         ops[0].cls.indata_len = outbound_size;
1412
1413         ret = rbd_req_sync_op(rbd_dev, NULL,
1414                                CEPH_NOSNAP,
1415                                flags, ops,
1416                                object_name, 0, inbound_size, inbound,
1417                                NULL, ver);
1418
1419         rbd_destroy_ops(ops);
1420
1421         dout("cls_exec returned %d\n", ret);
1422         return ret;
1423 }
1424
1425 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1426 {
1427         struct rbd_req_coll *coll =
1428                         kzalloc(sizeof(struct rbd_req_coll) +
1429                                 sizeof(struct rbd_req_status) * num_reqs,
1430                                 GFP_ATOMIC);
1431
1432         if (!coll)
1433                 return NULL;
1434         coll->total = num_reqs;
1435         kref_init(&coll->kref);
1436         return coll;
1437 }
1438
1439 /*
1440  * block device queue callback
1441  */
1442 static void rbd_rq_fn(struct request_queue *q)
1443 {
1444         struct rbd_device *rbd_dev = q->queuedata;
1445         struct request *rq;
1446         struct bio_pair *bp = NULL;
1447
1448         while ((rq = blk_fetch_request(q))) {
1449                 struct bio *bio;
1450                 struct bio *rq_bio, *next_bio = NULL;
1451                 bool do_write;
1452                 unsigned int size;
1453                 u64 op_size = 0;
1454                 u64 ofs;
1455                 int num_segs, cur_seg = 0;
1456                 struct rbd_req_coll *coll;
1457                 struct ceph_snap_context *snapc;
1458
1459                 dout("fetched request\n");
1460
1461                 /* filter out block requests we don't understand */
1462                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1463                         __blk_end_request_all(rq, 0);
1464                         continue;
1465                 }
1466
1467                 /* deduce our operation (read, write) */
1468                 do_write = (rq_data_dir(rq) == WRITE);
1469
1470                 size = blk_rq_bytes(rq);
1471                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1472                 rq_bio = rq->bio;
1473                 if (do_write && rbd_dev->mapping.read_only) {
1474                         __blk_end_request_all(rq, -EROFS);
1475                         continue;
1476                 }
1477
1478                 spin_unlock_irq(q->queue_lock);
1479
1480                 down_read(&rbd_dev->header_rwsem);
1481
1482                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1483                                 !rbd_dev->mapping.snap_exists) {
1484                         up_read(&rbd_dev->header_rwsem);
1485                         dout("request for non-existent snapshot");
1486                         spin_lock_irq(q->queue_lock);
1487                         __blk_end_request_all(rq, -ENXIO);
1488                         continue;
1489                 }
1490
1491                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1492
1493                 up_read(&rbd_dev->header_rwsem);
1494
1495                 dout("%s 0x%x bytes at 0x%llx\n",
1496                      do_write ? "write" : "read",
1497                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1498
1499                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1500                 if (num_segs <= 0) {
1501                         spin_lock_irq(q->queue_lock);
1502                         __blk_end_request_all(rq, num_segs);
1503                         ceph_put_snap_context(snapc);
1504                         continue;
1505                 }
1506                 coll = rbd_alloc_coll(num_segs);
1507                 if (!coll) {
1508                         spin_lock_irq(q->queue_lock);
1509                         __blk_end_request_all(rq, -ENOMEM);
1510                         ceph_put_snap_context(snapc);
1511                         continue;
1512                 }
1513
1514                 do {
1515                         /* a bio clone to be passed down to OSD req */
1516                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1517                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1518                         kref_get(&coll->kref);
1519                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1520                                               op_size, GFP_ATOMIC);
1521                         if (!bio) {
1522                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1523                                                        -ENOMEM, op_size);
1524                                 goto next_seg;
1525                         }
1526
1527                         /* init OSD command: write or read */
1528                         if (do_write)
1529                                 (void) rbd_do_op(rq, rbd_dev,
1530                                                 snapc, CEPH_NOSNAP,
1531                                                 ofs, op_size, bio,
1532                                                 coll, cur_seg);
1533                         else
1534                                 (void) rbd_do_op(rq, rbd_dev,
1535                                                 NULL, rbd_dev->mapping.snap_id,
1536                                                 ofs, op_size, bio,
1537                                                 coll, cur_seg);
1538 next_seg:
1539                         size -= op_size;
1540                         ofs += op_size;
1541
1542                         cur_seg++;
1543                         rq_bio = next_bio;
1544                 } while (size > 0);
1545                 kref_put(&coll->kref, rbd_coll_release);
1546
1547                 if (bp)
1548                         bio_pair_release(bp);
1549                 spin_lock_irq(q->queue_lock);
1550
1551                 ceph_put_snap_context(snapc);
1552         }
1553 }
1554
1555 /*
1556  * a queue callback. Makes sure that we don't create a bio that spans across
1557  * multiple osd objects. One exception would be with a single page bios,
1558  * which we handle later at bio_chain_clone
1559  */
1560 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1561                           struct bio_vec *bvec)
1562 {
1563         struct rbd_device *rbd_dev = q->queuedata;
1564         unsigned int chunk_sectors;
1565         sector_t sector;
1566         unsigned int bio_sectors;
1567         int max;
1568
1569         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1570         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1571         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1572
1573         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1574                                  + bio_sectors)) << SECTOR_SHIFT;
1575         if (max < 0)
1576                 max = 0; /* bio_add cannot handle a negative return */
1577         if (max <= bvec->bv_len && bio_sectors == 0)
1578                 return bvec->bv_len;
1579         return max;
1580 }
1581
1582 static void rbd_free_disk(struct rbd_device *rbd_dev)
1583 {
1584         struct gendisk *disk = rbd_dev->disk;
1585
1586         if (!disk)
1587                 return;
1588
1589         if (disk->flags & GENHD_FL_UP)
1590                 del_gendisk(disk);
1591         if (disk->queue)
1592                 blk_cleanup_queue(disk->queue);
1593         put_disk(disk);
1594 }
1595
1596 /*
1597  * Read the complete header for the given rbd device.
1598  *
1599  * Returns a pointer to a dynamically-allocated buffer containing
1600  * the complete and validated header.  Caller can pass the address
1601  * of a variable that will be filled in with the version of the
1602  * header object at the time it was read.
1603  *
1604  * Returns a pointer-coded errno if a failure occurs.
1605  */
1606 static struct rbd_image_header_ondisk *
1607 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1608 {
1609         struct rbd_image_header_ondisk *ondisk = NULL;
1610         u32 snap_count = 0;
1611         u64 names_size = 0;
1612         u32 want_count;
1613         int ret;
1614
1615         /*
1616          * The complete header will include an array of its 64-bit
1617          * snapshot ids, followed by the names of those snapshots as
1618          * a contiguous block of NUL-terminated strings.  Note that
1619          * the number of snapshots could change by the time we read
1620          * it in, in which case we re-read it.
1621          */
1622         do {
1623                 size_t size;
1624
1625                 kfree(ondisk);
1626
1627                 size = sizeof (*ondisk);
1628                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1629                 size += names_size;
1630                 ondisk = kmalloc(size, GFP_KERNEL);
1631                 if (!ondisk)
1632                         return ERR_PTR(-ENOMEM);
1633
1634                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1635                                        rbd_dev->header_name,
1636                                        0, size,
1637                                        (char *) ondisk, version);
1638
1639                 if (ret < 0)
1640                         goto out_err;
1641                 if (WARN_ON((size_t) ret < size)) {
1642                         ret = -ENXIO;
1643                         pr_warning("short header read for image %s"
1644                                         " (want %zd got %d)\n",
1645                                 rbd_dev->image_name, size, ret);
1646                         goto out_err;
1647                 }
1648                 if (!rbd_dev_ondisk_valid(ondisk)) {
1649                         ret = -ENXIO;
1650                         pr_warning("invalid header for image %s\n",
1651                                 rbd_dev->image_name);
1652                         goto out_err;
1653                 }
1654
1655                 names_size = le64_to_cpu(ondisk->snap_names_len);
1656                 want_count = snap_count;
1657                 snap_count = le32_to_cpu(ondisk->snap_count);
1658         } while (snap_count != want_count);
1659
1660         return ondisk;
1661
1662 out_err:
1663         kfree(ondisk);
1664
1665         return ERR_PTR(ret);
1666 }
1667
1668 /*
1669  * reload the ondisk the header
1670  */
1671 static int rbd_read_header(struct rbd_device *rbd_dev,
1672                            struct rbd_image_header *header)
1673 {
1674         struct rbd_image_header_ondisk *ondisk;
1675         u64 ver = 0;
1676         int ret;
1677
1678         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1679         if (IS_ERR(ondisk))
1680                 return PTR_ERR(ondisk);
1681         ret = rbd_header_from_disk(header, ondisk);
1682         if (ret >= 0)
1683                 header->obj_version = ver;
1684         kfree(ondisk);
1685
1686         return ret;
1687 }
1688
1689 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1690 {
1691         struct rbd_snap *snap;
1692         struct rbd_snap *next;
1693
1694         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1695                 __rbd_remove_snap_dev(snap);
1696 }
1697
1698 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1699 {
1700         sector_t size;
1701
1702         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1703                 return;
1704
1705         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1706         dout("setting size to %llu sectors", (unsigned long long) size);
1707         rbd_dev->mapping.size = (u64) size;
1708         set_capacity(rbd_dev->disk, size);
1709 }
1710
1711 /*
1712  * only read the first part of the ondisk header, without the snaps info
1713  */
1714 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1715 {
1716         int ret;
1717         struct rbd_image_header h;
1718
1719         ret = rbd_read_header(rbd_dev, &h);
1720         if (ret < 0)
1721                 return ret;
1722
1723         down_write(&rbd_dev->header_rwsem);
1724
1725         /* Update image size, and check for resize of mapped image */
1726         rbd_dev->header.image_size = h.image_size;
1727         rbd_update_mapping_size(rbd_dev);
1728
1729         /* rbd_dev->header.object_prefix shouldn't change */
1730         kfree(rbd_dev->header.snap_sizes);
1731         kfree(rbd_dev->header.snap_names);
1732         /* osd requests may still refer to snapc */
1733         ceph_put_snap_context(rbd_dev->header.snapc);
1734
1735         if (hver)
1736                 *hver = h.obj_version;
1737         rbd_dev->header.obj_version = h.obj_version;
1738         rbd_dev->header.image_size = h.image_size;
1739         rbd_dev->header.snapc = h.snapc;
1740         rbd_dev->header.snap_names = h.snap_names;
1741         rbd_dev->header.snap_sizes = h.snap_sizes;
1742         /* Free the extra copy of the object prefix */
1743         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1744         kfree(h.object_prefix);
1745
1746         ret = rbd_dev_snaps_update(rbd_dev);
1747         if (!ret)
1748                 ret = rbd_dev_snaps_register(rbd_dev);
1749
1750         up_write(&rbd_dev->header_rwsem);
1751
1752         return ret;
1753 }
1754
1755 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1756 {
1757         int ret;
1758
1759         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1760         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1761         if (rbd_dev->image_format == 1)
1762                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1763         else
1764                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1765         mutex_unlock(&ctl_mutex);
1766
1767         return ret;
1768 }
1769
1770 static int rbd_init_disk(struct rbd_device *rbd_dev)
1771 {
1772         struct gendisk *disk;
1773         struct request_queue *q;
1774         u64 segment_size;
1775
1776         /* create gendisk info */
1777         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1778         if (!disk)
1779                 return -ENOMEM;
1780
1781         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1782                  rbd_dev->dev_id);
1783         disk->major = rbd_dev->major;
1784         disk->first_minor = 0;
1785         disk->fops = &rbd_bd_ops;
1786         disk->private_data = rbd_dev;
1787
1788         /* init rq */
1789         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1790         if (!q)
1791                 goto out_disk;
1792
1793         /* We use the default size, but let's be explicit about it. */
1794         blk_queue_physical_block_size(q, SECTOR_SIZE);
1795
1796         /* set io sizes to object size */
1797         segment_size = rbd_obj_bytes(&rbd_dev->header);
1798         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1799         blk_queue_max_segment_size(q, segment_size);
1800         blk_queue_io_min(q, segment_size);
1801         blk_queue_io_opt(q, segment_size);
1802
1803         blk_queue_merge_bvec(q, rbd_merge_bvec);
1804         disk->queue = q;
1805
1806         q->queuedata = rbd_dev;
1807
1808         rbd_dev->disk = disk;
1809
1810         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1811
1812         return 0;
1813 out_disk:
1814         put_disk(disk);
1815
1816         return -ENOMEM;
1817 }
1818
1819 /*
1820   sysfs
1821 */
1822
1823 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1824 {
1825         return container_of(dev, struct rbd_device, dev);
1826 }
1827
1828 static ssize_t rbd_size_show(struct device *dev,
1829                              struct device_attribute *attr, char *buf)
1830 {
1831         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1832         sector_t size;
1833
1834         down_read(&rbd_dev->header_rwsem);
1835         size = get_capacity(rbd_dev->disk);
1836         up_read(&rbd_dev->header_rwsem);
1837
1838         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1839 }
1840
1841 /*
1842  * Note this shows the features for whatever's mapped, which is not
1843  * necessarily the base image.
1844  */
1845 static ssize_t rbd_features_show(struct device *dev,
1846                              struct device_attribute *attr, char *buf)
1847 {
1848         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1849
1850         return sprintf(buf, "0x%016llx\n",
1851                         (unsigned long long) rbd_dev->mapping.features);
1852 }
1853
1854 static ssize_t rbd_major_show(struct device *dev,
1855                               struct device_attribute *attr, char *buf)
1856 {
1857         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1858
1859         return sprintf(buf, "%d\n", rbd_dev->major);
1860 }
1861
1862 static ssize_t rbd_client_id_show(struct device *dev,
1863                                   struct device_attribute *attr, char *buf)
1864 {
1865         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1866
1867         return sprintf(buf, "client%lld\n",
1868                         ceph_client_id(rbd_dev->rbd_client->client));
1869 }
1870
1871 static ssize_t rbd_pool_show(struct device *dev,
1872                              struct device_attribute *attr, char *buf)
1873 {
1874         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1875
1876         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1877 }
1878
1879 static ssize_t rbd_pool_id_show(struct device *dev,
1880                              struct device_attribute *attr, char *buf)
1881 {
1882         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1883
1884         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1885 }
1886
1887 static ssize_t rbd_name_show(struct device *dev,
1888                              struct device_attribute *attr, char *buf)
1889 {
1890         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1891
1892         return sprintf(buf, "%s\n", rbd_dev->image_name);
1893 }
1894
1895 static ssize_t rbd_image_id_show(struct device *dev,
1896                              struct device_attribute *attr, char *buf)
1897 {
1898         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1899
1900         return sprintf(buf, "%s\n", rbd_dev->image_id);
1901 }
1902
1903 /*
1904  * Shows the name of the currently-mapped snapshot (or
1905  * RBD_SNAP_HEAD_NAME for the base image).
1906  */
1907 static ssize_t rbd_snap_show(struct device *dev,
1908                              struct device_attribute *attr,
1909                              char *buf)
1910 {
1911         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1912
1913         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1914 }
1915
1916 static ssize_t rbd_image_refresh(struct device *dev,
1917                                  struct device_attribute *attr,
1918                                  const char *buf,
1919                                  size_t size)
1920 {
1921         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1922         int ret;
1923
1924         ret = rbd_dev_refresh(rbd_dev, NULL);
1925
1926         return ret < 0 ? ret : size;
1927 }
1928
1929 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1930 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1931 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1932 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1933 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1934 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1935 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1936 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1937 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1938 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1939
1940 static struct attribute *rbd_attrs[] = {
1941         &dev_attr_size.attr,
1942         &dev_attr_features.attr,
1943         &dev_attr_major.attr,
1944         &dev_attr_client_id.attr,
1945         &dev_attr_pool.attr,
1946         &dev_attr_pool_id.attr,
1947         &dev_attr_name.attr,
1948         &dev_attr_image_id.attr,
1949         &dev_attr_current_snap.attr,
1950         &dev_attr_refresh.attr,
1951         NULL
1952 };
1953
1954 static struct attribute_group rbd_attr_group = {
1955         .attrs = rbd_attrs,
1956 };
1957
1958 static const struct attribute_group *rbd_attr_groups[] = {
1959         &rbd_attr_group,
1960         NULL
1961 };
1962
1963 static void rbd_sysfs_dev_release(struct device *dev)
1964 {
1965 }
1966
1967 static struct device_type rbd_device_type = {
1968         .name           = "rbd",
1969         .groups         = rbd_attr_groups,
1970         .release        = rbd_sysfs_dev_release,
1971 };
1972
1973
1974 /*
1975   sysfs - snapshots
1976 */
1977
1978 static ssize_t rbd_snap_size_show(struct device *dev,
1979                                   struct device_attribute *attr,
1980                                   char *buf)
1981 {
1982         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1983
1984         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1985 }
1986
1987 static ssize_t rbd_snap_id_show(struct device *dev,
1988                                 struct device_attribute *attr,
1989                                 char *buf)
1990 {
1991         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1992
1993         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1994 }
1995
1996 static ssize_t rbd_snap_features_show(struct device *dev,
1997                                 struct device_attribute *attr,
1998                                 char *buf)
1999 {
2000         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001
2002         return sprintf(buf, "0x%016llx\n",
2003                         (unsigned long long) snap->features);
2004 }
2005
2006 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2007 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2008 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2009
2010 static struct attribute *rbd_snap_attrs[] = {
2011         &dev_attr_snap_size.attr,
2012         &dev_attr_snap_id.attr,
2013         &dev_attr_snap_features.attr,
2014         NULL,
2015 };
2016
2017 static struct attribute_group rbd_snap_attr_group = {
2018         .attrs = rbd_snap_attrs,
2019 };
2020
2021 static void rbd_snap_dev_release(struct device *dev)
2022 {
2023         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2024         kfree(snap->name);
2025         kfree(snap);
2026 }
2027
2028 static const struct attribute_group *rbd_snap_attr_groups[] = {
2029         &rbd_snap_attr_group,
2030         NULL
2031 };
2032
2033 static struct device_type rbd_snap_device_type = {
2034         .groups         = rbd_snap_attr_groups,
2035         .release        = rbd_snap_dev_release,
2036 };
2037
2038 static bool rbd_snap_registered(struct rbd_snap *snap)
2039 {
2040         bool ret = snap->dev.type == &rbd_snap_device_type;
2041         bool reg = device_is_registered(&snap->dev);
2042
2043         rbd_assert(!ret ^ reg);
2044
2045         return ret;
2046 }
2047
2048 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2049 {
2050         list_del(&snap->node);
2051         if (device_is_registered(&snap->dev))
2052                 device_unregister(&snap->dev);
2053 }
2054
2055 static int rbd_register_snap_dev(struct rbd_snap *snap,
2056                                   struct device *parent)
2057 {
2058         struct device *dev = &snap->dev;
2059         int ret;
2060
2061         dev->type = &rbd_snap_device_type;
2062         dev->parent = parent;
2063         dev->release = rbd_snap_dev_release;
2064         dev_set_name(dev, "snap_%s", snap->name);
2065         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2066
2067         ret = device_register(dev);
2068
2069         return ret;
2070 }
2071
2072 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2073                                                 const char *snap_name,
2074                                                 u64 snap_id, u64 snap_size,
2075                                                 u64 snap_features)
2076 {
2077         struct rbd_snap *snap;
2078         int ret;
2079
2080         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2081         if (!snap)
2082                 return ERR_PTR(-ENOMEM);
2083
2084         ret = -ENOMEM;
2085         snap->name = kstrdup(snap_name, GFP_KERNEL);
2086         if (!snap->name)
2087                 goto err;
2088
2089         snap->id = snap_id;
2090         snap->size = snap_size;
2091         snap->features = snap_features;
2092
2093         return snap;
2094
2095 err:
2096         kfree(snap->name);
2097         kfree(snap);
2098
2099         return ERR_PTR(ret);
2100 }
2101
2102 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2103                 u64 *snap_size, u64 *snap_features)
2104 {
2105         char *snap_name;
2106
2107         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2108
2109         *snap_size = rbd_dev->header.snap_sizes[which];
2110         *snap_features = 0;     /* No features for v1 */
2111
2112         /* Skip over names until we find the one we are looking for */
2113
2114         snap_name = rbd_dev->header.snap_names;
2115         while (which--)
2116                 snap_name += strlen(snap_name) + 1;
2117
2118         return snap_name;
2119 }
2120
2121 /*
2122  * Get the size and object order for an image snapshot, or if
2123  * snap_id is CEPH_NOSNAP, gets this information for the base
2124  * image.
2125  */
2126 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2127                                 u8 *order, u64 *snap_size)
2128 {
2129         __le64 snapid = cpu_to_le64(snap_id);
2130         int ret;
2131         struct {
2132                 u8 order;
2133                 __le64 size;
2134         } __attribute__ ((packed)) size_buf = { 0 };
2135
2136         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2137                                 "rbd", "get_size",
2138                                 (char *) &snapid, sizeof (snapid),
2139                                 (char *) &size_buf, sizeof (size_buf),
2140                                 CEPH_OSD_FLAG_READ, NULL);
2141         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2142         if (ret < 0)
2143                 return ret;
2144
2145         *order = size_buf.order;
2146         *snap_size = le64_to_cpu(size_buf.size);
2147
2148         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2149                 (unsigned long long) snap_id, (unsigned int) *order,
2150                 (unsigned long long) *snap_size);
2151
2152         return 0;
2153 }
2154
2155 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2156 {
2157         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2158                                         &rbd_dev->header.obj_order,
2159                                         &rbd_dev->header.image_size);
2160 }
2161
2162 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2163 {
2164         void *reply_buf;
2165         int ret;
2166         void *p;
2167
2168         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2169         if (!reply_buf)
2170                 return -ENOMEM;
2171
2172         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2173                                 "rbd", "get_object_prefix",
2174                                 NULL, 0,
2175                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2176                                 CEPH_OSD_FLAG_READ, NULL);
2177         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2178         if (ret < 0)
2179                 goto out;
2180         ret = 0;    /* rbd_req_sync_exec() can return positive */
2181
2182         p = reply_buf;
2183         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2184                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2185                                                 NULL, GFP_NOIO);
2186
2187         if (IS_ERR(rbd_dev->header.object_prefix)) {
2188                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2189                 rbd_dev->header.object_prefix = NULL;
2190         } else {
2191                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2192         }
2193
2194 out:
2195         kfree(reply_buf);
2196
2197         return ret;
2198 }
2199
2200 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2201                 u64 *snap_features)
2202 {
2203         __le64 snapid = cpu_to_le64(snap_id);
2204         struct {
2205                 __le64 features;
2206                 __le64 incompat;
2207         } features_buf = { 0 };
2208         u64 incompat;
2209         int ret;
2210
2211         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2212                                 "rbd", "get_features",
2213                                 (char *) &snapid, sizeof (snapid),
2214                                 (char *) &features_buf, sizeof (features_buf),
2215                                 CEPH_OSD_FLAG_READ, NULL);
2216         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2217         if (ret < 0)
2218                 return ret;
2219
2220         incompat = le64_to_cpu(features_buf.incompat);
2221         if (incompat & ~RBD_FEATURES_ALL)
2222                 return -ENOTSUPP;
2223
2224         *snap_features = le64_to_cpu(features_buf.features);
2225
2226         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2227                 (unsigned long long) snap_id,
2228                 (unsigned long long) *snap_features,
2229                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2230
2231         return 0;
2232 }
2233
2234 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2235 {
2236         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2237                                                 &rbd_dev->header.features);
2238 }
2239
2240 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2241 {
2242         size_t size;
2243         int ret;
2244         void *reply_buf;
2245         void *p;
2246         void *end;
2247         u64 seq;
2248         u32 snap_count;
2249         struct ceph_snap_context *snapc;
2250         u32 i;
2251
2252         /*
2253          * We'll need room for the seq value (maximum snapshot id),
2254          * snapshot count, and array of that many snapshot ids.
2255          * For now we have a fixed upper limit on the number we're
2256          * prepared to receive.
2257          */
2258         size = sizeof (__le64) + sizeof (__le32) +
2259                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2260         reply_buf = kzalloc(size, GFP_KERNEL);
2261         if (!reply_buf)
2262                 return -ENOMEM;
2263
2264         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2265                                 "rbd", "get_snapcontext",
2266                                 NULL, 0,
2267                                 reply_buf, size,
2268                                 CEPH_OSD_FLAG_READ, ver);
2269         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2270         if (ret < 0)
2271                 goto out;
2272
2273         ret = -ERANGE;
2274         p = reply_buf;
2275         end = (char *) reply_buf + size;
2276         ceph_decode_64_safe(&p, end, seq, out);
2277         ceph_decode_32_safe(&p, end, snap_count, out);
2278
2279         /*
2280          * Make sure the reported number of snapshot ids wouldn't go
2281          * beyond the end of our buffer.  But before checking that,
2282          * make sure the computed size of the snapshot context we
2283          * allocate is representable in a size_t.
2284          */
2285         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2286                                  / sizeof (u64)) {
2287                 ret = -EINVAL;
2288                 goto out;
2289         }
2290         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2291                 goto out;
2292
2293         size = sizeof (struct ceph_snap_context) +
2294                                 snap_count * sizeof (snapc->snaps[0]);
2295         snapc = kmalloc(size, GFP_KERNEL);
2296         if (!snapc) {
2297                 ret = -ENOMEM;
2298                 goto out;
2299         }
2300
2301         atomic_set(&snapc->nref, 1);
2302         snapc->seq = seq;
2303         snapc->num_snaps = snap_count;
2304         for (i = 0; i < snap_count; i++)
2305                 snapc->snaps[i] = ceph_decode_64(&p);
2306
2307         rbd_dev->header.snapc = snapc;
2308
2309         dout("  snap context seq = %llu, snap_count = %u\n",
2310                 (unsigned long long) seq, (unsigned int) snap_count);
2311
2312 out:
2313         kfree(reply_buf);
2314
2315         return 0;
2316 }
2317
2318 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2319 {
2320         size_t size;
2321         void *reply_buf;
2322         __le64 snap_id;
2323         int ret;
2324         void *p;
2325         void *end;
2326         size_t snap_name_len;
2327         char *snap_name;
2328
2329         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2330         reply_buf = kmalloc(size, GFP_KERNEL);
2331         if (!reply_buf)
2332                 return ERR_PTR(-ENOMEM);
2333
2334         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2335         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2336                                 "rbd", "get_snapshot_name",
2337                                 (char *) &snap_id, sizeof (snap_id),
2338                                 reply_buf, size,
2339                                 CEPH_OSD_FLAG_READ, NULL);
2340         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2341         if (ret < 0)
2342                 goto out;
2343
2344         p = reply_buf;
2345         end = (char *) reply_buf + size;
2346         snap_name_len = 0;
2347         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2348                                 GFP_KERNEL);
2349         if (IS_ERR(snap_name)) {
2350                 ret = PTR_ERR(snap_name);
2351                 goto out;
2352         } else {
2353                 dout("  snap_id 0x%016llx snap_name = %s\n",
2354                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2355         }
2356         kfree(reply_buf);
2357
2358         return snap_name;
2359 out:
2360         kfree(reply_buf);
2361
2362         return ERR_PTR(ret);
2363 }
2364
2365 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2366                 u64 *snap_size, u64 *snap_features)
2367 {
2368         __le64 snap_id;
2369         u8 order;
2370         int ret;
2371
2372         snap_id = rbd_dev->header.snapc->snaps[which];
2373         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2374         if (ret)
2375                 return ERR_PTR(ret);
2376         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2377         if (ret)
2378                 return ERR_PTR(ret);
2379
2380         return rbd_dev_v2_snap_name(rbd_dev, which);
2381 }
2382
2383 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2384                 u64 *snap_size, u64 *snap_features)
2385 {
2386         if (rbd_dev->image_format == 1)
2387                 return rbd_dev_v1_snap_info(rbd_dev, which,
2388                                         snap_size, snap_features);
2389         if (rbd_dev->image_format == 2)
2390                 return rbd_dev_v2_snap_info(rbd_dev, which,
2391                                         snap_size, snap_features);
2392         return ERR_PTR(-EINVAL);
2393 }
2394
2395 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2396 {
2397         int ret;
2398         __u8 obj_order;
2399
2400         down_write(&rbd_dev->header_rwsem);
2401
2402         /* Grab old order first, to see if it changes */
2403
2404         obj_order = rbd_dev->header.obj_order,
2405         ret = rbd_dev_v2_image_size(rbd_dev);
2406         if (ret)
2407                 goto out;
2408         if (rbd_dev->header.obj_order != obj_order) {
2409                 ret = -EIO;
2410                 goto out;
2411         }
2412         rbd_update_mapping_size(rbd_dev);
2413
2414         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2415         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2416         if (ret)
2417                 goto out;
2418         ret = rbd_dev_snaps_update(rbd_dev);
2419         dout("rbd_dev_snaps_update returned %d\n", ret);
2420         if (ret)
2421                 goto out;
2422         ret = rbd_dev_snaps_register(rbd_dev);
2423         dout("rbd_dev_snaps_register returned %d\n", ret);
2424 out:
2425         up_write(&rbd_dev->header_rwsem);
2426
2427         return ret;
2428 }
2429
2430 /*
2431  * Scan the rbd device's current snapshot list and compare it to the
2432  * newly-received snapshot context.  Remove any existing snapshots
2433  * not present in the new snapshot context.  Add a new snapshot for
2434  * any snaphots in the snapshot context not in the current list.
2435  * And verify there are no changes to snapshots we already know
2436  * about.
2437  *
2438  * Assumes the snapshots in the snapshot context are sorted by
2439  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2440  * are also maintained in that order.)
2441  */
2442 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2443 {
2444         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2445         const u32 snap_count = snapc->num_snaps;
2446         struct list_head *head = &rbd_dev->snaps;
2447         struct list_head *links = head->next;
2448         u32 index = 0;
2449
2450         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2451         while (index < snap_count || links != head) {
2452                 u64 snap_id;
2453                 struct rbd_snap *snap;
2454                 char *snap_name;
2455                 u64 snap_size = 0;
2456                 u64 snap_features = 0;
2457
2458                 snap_id = index < snap_count ? snapc->snaps[index]
2459                                              : CEPH_NOSNAP;
2460                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2461                                      : NULL;
2462                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2463
2464                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2465                         struct list_head *next = links->next;
2466
2467                         /* Existing snapshot not in the new snap context */
2468
2469                         if (rbd_dev->mapping.snap_id == snap->id)
2470                                 rbd_dev->mapping.snap_exists = false;
2471                         __rbd_remove_snap_dev(snap);
2472                         dout("%ssnap id %llu has been removed\n",
2473                                 rbd_dev->mapping.snap_id == snap->id ?
2474                                                                 "mapped " : "",
2475                                 (unsigned long long) snap->id);
2476
2477                         /* Done with this list entry; advance */
2478
2479                         links = next;
2480                         continue;
2481                 }
2482
2483                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2484                                         &snap_size, &snap_features);
2485                 if (IS_ERR(snap_name))
2486                         return PTR_ERR(snap_name);
2487
2488                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2489                         (unsigned long long) snap_id);
2490                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2491                         struct rbd_snap *new_snap;
2492
2493                         /* We haven't seen this snapshot before */
2494
2495                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2496                                         snap_id, snap_size, snap_features);
2497                         if (IS_ERR(new_snap)) {
2498                                 int err = PTR_ERR(new_snap);
2499
2500                                 dout("  failed to add dev, error %d\n", err);
2501
2502                                 return err;
2503                         }
2504
2505                         /* New goes before existing, or at end of list */
2506
2507                         dout("  added dev%s\n", snap ? "" : " at end\n");
2508                         if (snap)
2509                                 list_add_tail(&new_snap->node, &snap->node);
2510                         else
2511                                 list_add_tail(&new_snap->node, head);
2512                 } else {
2513                         /* Already have this one */
2514
2515                         dout("  already present\n");
2516
2517                         rbd_assert(snap->size == snap_size);
2518                         rbd_assert(!strcmp(snap->name, snap_name));
2519                         rbd_assert(snap->features == snap_features);
2520
2521                         /* Done with this list entry; advance */
2522
2523                         links = links->next;
2524                 }
2525
2526                 /* Advance to the next entry in the snapshot context */
2527
2528                 index++;
2529         }
2530         dout("%s: done\n", __func__);
2531
2532         return 0;
2533 }
2534
2535 /*
2536  * Scan the list of snapshots and register the devices for any that
2537  * have not already been registered.
2538  */
2539 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2540 {
2541         struct rbd_snap *snap;
2542         int ret = 0;
2543
2544         dout("%s called\n", __func__);
2545         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2546                 return -EIO;
2547
2548         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2549                 if (!rbd_snap_registered(snap)) {
2550                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2551                         if (ret < 0)
2552                                 break;
2553                 }
2554         }
2555         dout("%s: returning %d\n", __func__, ret);
2556
2557         return ret;
2558 }
2559
2560 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2561 {
2562         struct device *dev;
2563         int ret;
2564
2565         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2566
2567         dev = &rbd_dev->dev;
2568         dev->bus = &rbd_bus_type;
2569         dev->type = &rbd_device_type;
2570         dev->parent = &rbd_root_dev;
2571         dev->release = rbd_dev_release;
2572         dev_set_name(dev, "%d", rbd_dev->dev_id);
2573         ret = device_register(dev);
2574
2575         mutex_unlock(&ctl_mutex);
2576
2577         return ret;
2578 }
2579
2580 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2581 {
2582         device_unregister(&rbd_dev->dev);
2583 }
2584
2585 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2586 {
2587         int ret, rc;
2588
2589         do {
2590                 ret = rbd_req_sync_watch(rbd_dev);
2591                 if (ret == -ERANGE) {
2592                         rc = rbd_dev_refresh(rbd_dev, NULL);
2593                         if (rc < 0)
2594                                 return rc;
2595                 }
2596         } while (ret == -ERANGE);
2597
2598         return ret;
2599 }
2600
2601 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2602
2603 /*
2604  * Get a unique rbd identifier for the given new rbd_dev, and add
2605  * the rbd_dev to the global list.  The minimum rbd id is 1.
2606  */
2607 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2608 {
2609         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2610
2611         spin_lock(&rbd_dev_list_lock);
2612         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2613         spin_unlock(&rbd_dev_list_lock);
2614         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2615                 (unsigned long long) rbd_dev->dev_id);
2616 }
2617
2618 /*
2619  * Remove an rbd_dev from the global list, and record that its
2620  * identifier is no longer in use.
2621  */
2622 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2623 {
2624         struct list_head *tmp;
2625         int rbd_id = rbd_dev->dev_id;
2626         int max_id;
2627
2628         rbd_assert(rbd_id > 0);
2629
2630         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2631                 (unsigned long long) rbd_dev->dev_id);
2632         spin_lock(&rbd_dev_list_lock);
2633         list_del_init(&rbd_dev->node);
2634
2635         /*
2636          * If the id being "put" is not the current maximum, there
2637          * is nothing special we need to do.
2638          */
2639         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2640                 spin_unlock(&rbd_dev_list_lock);
2641                 return;
2642         }
2643
2644         /*
2645          * We need to update the current maximum id.  Search the
2646          * list to find out what it is.  We're more likely to find
2647          * the maximum at the end, so search the list backward.
2648          */
2649         max_id = 0;
2650         list_for_each_prev(tmp, &rbd_dev_list) {
2651                 struct rbd_device *rbd_dev;
2652
2653                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2654                 if (rbd_dev->dev_id > max_id)
2655                         max_id = rbd_dev->dev_id;
2656         }
2657         spin_unlock(&rbd_dev_list_lock);
2658
2659         /*
2660          * The max id could have been updated by rbd_dev_id_get(), in
2661          * which case it now accurately reflects the new maximum.
2662          * Be careful not to overwrite the maximum value in that
2663          * case.
2664          */
2665         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2666         dout("  max dev id has been reset\n");
2667 }
2668
2669 /*
2670  * Skips over white space at *buf, and updates *buf to point to the
2671  * first found non-space character (if any). Returns the length of
2672  * the token (string of non-white space characters) found.  Note
2673  * that *buf must be terminated with '\0'.
2674  */
2675 static inline size_t next_token(const char **buf)
2676 {
2677         /*
2678         * These are the characters that produce nonzero for
2679         * isspace() in the "C" and "POSIX" locales.
2680         */
2681         const char *spaces = " \f\n\r\t\v";
2682
2683         *buf += strspn(*buf, spaces);   /* Find start of token */
2684
2685         return strcspn(*buf, spaces);   /* Return token length */
2686 }
2687
2688 /*
2689  * Finds the next token in *buf, and if the provided token buffer is
2690  * big enough, copies the found token into it.  The result, if
2691  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2692  * must be terminated with '\0' on entry.
2693  *
2694  * Returns the length of the token found (not including the '\0').
2695  * Return value will be 0 if no token is found, and it will be >=
2696  * token_size if the token would not fit.
2697  *
2698  * The *buf pointer will be updated to point beyond the end of the
2699  * found token.  Note that this occurs even if the token buffer is
2700  * too small to hold it.
2701  */
2702 static inline size_t copy_token(const char **buf,
2703                                 char *token,
2704                                 size_t token_size)
2705 {
2706         size_t len;
2707
2708         len = next_token(buf);
2709         if (len < token_size) {
2710                 memcpy(token, *buf, len);
2711                 *(token + len) = '\0';
2712         }
2713         *buf += len;
2714
2715         return len;
2716 }
2717
2718 /*
2719  * Finds the next token in *buf, dynamically allocates a buffer big
2720  * enough to hold a copy of it, and copies the token into the new
2721  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2722  * that a duplicate buffer is created even for a zero-length token.
2723  *
2724  * Returns a pointer to the newly-allocated duplicate, or a null
2725  * pointer if memory for the duplicate was not available.  If
2726  * the lenp argument is a non-null pointer, the length of the token
2727  * (not including the '\0') is returned in *lenp.
2728  *
2729  * If successful, the *buf pointer will be updated to point beyond
2730  * the end of the found token.
2731  *
2732  * Note: uses GFP_KERNEL for allocation.
2733  */
2734 static inline char *dup_token(const char **buf, size_t *lenp)
2735 {
2736         char *dup;
2737         size_t len;
2738
2739         len = next_token(buf);
2740         dup = kmalloc(len + 1, GFP_KERNEL);
2741         if (!dup)
2742                 return NULL;
2743
2744         memcpy(dup, *buf, len);
2745         *(dup + len) = '\0';
2746         *buf += len;
2747
2748         if (lenp)
2749                 *lenp = len;
2750
2751         return dup;
2752 }
2753
2754 /*
2755  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2756  * rbd_md_name, and name fields of the given rbd_dev, based on the
2757  * list of monitor addresses and other options provided via
2758  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2759  * copy of the snapshot name to map if successful, or a
2760  * pointer-coded error otherwise.
2761  *
2762  * Note: rbd_dev is assumed to have been initially zero-filled.
2763  */
2764 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2765                                 const char *buf,
2766                                 const char **mon_addrs,
2767                                 size_t *mon_addrs_size,
2768                                 char *options,
2769                                 size_t options_size)
2770 {
2771         size_t len;
2772         char *err_ptr = ERR_PTR(-EINVAL);
2773         char *snap_name;
2774
2775         /* The first four tokens are required */
2776
2777         len = next_token(&buf);
2778         if (!len)
2779                 return err_ptr;
2780         *mon_addrs_size = len + 1;
2781         *mon_addrs = buf;
2782
2783         buf += len;
2784
2785         len = copy_token(&buf, options, options_size);
2786         if (!len || len >= options_size)
2787                 return err_ptr;
2788
2789         err_ptr = ERR_PTR(-ENOMEM);
2790         rbd_dev->pool_name = dup_token(&buf, NULL);
2791         if (!rbd_dev->pool_name)
2792                 goto out_err;
2793
2794         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2795         if (!rbd_dev->image_name)
2796                 goto out_err;
2797
2798         /* Snapshot name is optional */
2799         len = next_token(&buf);
2800         if (!len) {
2801                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2802                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2803         }
2804         snap_name = kmalloc(len + 1, GFP_KERNEL);
2805         if (!snap_name)
2806                 goto out_err;
2807         memcpy(snap_name, buf, len);
2808         *(snap_name + len) = '\0';
2809
2810 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2811
2812         return snap_name;
2813
2814 out_err:
2815         kfree(rbd_dev->image_name);
2816         rbd_dev->image_name = NULL;
2817         rbd_dev->image_name_len = 0;
2818         kfree(rbd_dev->pool_name);
2819         rbd_dev->pool_name = NULL;
2820
2821         return err_ptr;
2822 }
2823
2824 /*
2825  * An rbd format 2 image has a unique identifier, distinct from the
2826  * name given to it by the user.  Internally, that identifier is
2827  * what's used to specify the names of objects related to the image.
2828  *
2829  * A special "rbd id" object is used to map an rbd image name to its
2830  * id.  If that object doesn't exist, then there is no v2 rbd image
2831  * with the supplied name.
2832  *
2833  * This function will record the given rbd_dev's image_id field if
2834  * it can be determined, and in that case will return 0.  If any
2835  * errors occur a negative errno will be returned and the rbd_dev's
2836  * image_id field will be unchanged (and should be NULL).
2837  */
2838 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2839 {
2840         int ret;
2841         size_t size;
2842         char *object_name;
2843         void *response;
2844         void *p;
2845
2846         /*
2847          * First, see if the format 2 image id file exists, and if
2848          * so, get the image's persistent id from it.
2849          */
2850         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2851         object_name = kmalloc(size, GFP_NOIO);
2852         if (!object_name)
2853                 return -ENOMEM;
2854         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2855         dout("rbd id object name is %s\n", object_name);
2856
2857         /* Response will be an encoded string, which includes a length */
2858
2859         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2860         response = kzalloc(size, GFP_NOIO);
2861         if (!response) {
2862                 ret = -ENOMEM;
2863                 goto out;
2864         }
2865
2866         ret = rbd_req_sync_exec(rbd_dev, object_name,
2867                                 "rbd", "get_id",
2868                                 NULL, 0,
2869                                 response, RBD_IMAGE_ID_LEN_MAX,
2870                                 CEPH_OSD_FLAG_READ, NULL);
2871         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2872         if (ret < 0)
2873                 goto out;
2874         ret = 0;    /* rbd_req_sync_exec() can return positive */
2875
2876         p = response;
2877         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2878                                                 p + RBD_IMAGE_ID_LEN_MAX,
2879                                                 &rbd_dev->image_id_len,
2880                                                 GFP_NOIO);
2881         if (IS_ERR(rbd_dev->image_id)) {
2882                 ret = PTR_ERR(rbd_dev->image_id);
2883                 rbd_dev->image_id = NULL;
2884         } else {
2885                 dout("image_id is %s\n", rbd_dev->image_id);
2886         }
2887 out:
2888         kfree(response);
2889         kfree(object_name);
2890
2891         return ret;
2892 }
2893
2894 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2895 {
2896         int ret;
2897         size_t size;
2898
2899         /* Version 1 images have no id; empty string is used */
2900
2901         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2902         if (!rbd_dev->image_id)
2903                 return -ENOMEM;
2904         rbd_dev->image_id_len = 0;
2905
2906         /* Record the header object name for this rbd image. */
2907
2908         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2909         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2910         if (!rbd_dev->header_name) {
2911                 ret = -ENOMEM;
2912                 goto out_err;
2913         }
2914         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2915
2916         /* Populate rbd image metadata */
2917
2918         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2919         if (ret < 0)
2920                 goto out_err;
2921         rbd_dev->image_format = 1;
2922
2923         dout("discovered version 1 image, header name is %s\n",
2924                 rbd_dev->header_name);
2925
2926         return 0;
2927
2928 out_err:
2929         kfree(rbd_dev->header_name);
2930         rbd_dev->header_name = NULL;
2931         kfree(rbd_dev->image_id);
2932         rbd_dev->image_id = NULL;
2933
2934         return ret;
2935 }
2936
2937 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2938 {
2939         size_t size;
2940         int ret;
2941         u64 ver = 0;
2942
2943         /*
2944          * Image id was filled in by the caller.  Record the header
2945          * object name for this rbd image.
2946          */
2947         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2948         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2949         if (!rbd_dev->header_name)
2950                 return -ENOMEM;
2951         sprintf(rbd_dev->header_name, "%s%s",
2952                         RBD_HEADER_PREFIX, rbd_dev->image_id);
2953
2954         /* Get the size and object order for the image */
2955
2956         ret = rbd_dev_v2_image_size(rbd_dev);
2957         if (ret < 0)
2958                 goto out_err;
2959
2960         /* Get the object prefix (a.k.a. block_name) for the image */
2961
2962         ret = rbd_dev_v2_object_prefix(rbd_dev);
2963         if (ret < 0)
2964                 goto out_err;
2965
2966         /* Get the and check features for the image */
2967
2968         ret = rbd_dev_v2_features(rbd_dev);
2969         if (ret < 0)
2970                 goto out_err;
2971
2972         /* crypto and compression type aren't (yet) supported for v2 images */
2973
2974         rbd_dev->header.crypt_type = 0;
2975         rbd_dev->header.comp_type = 0;
2976
2977         /* Get the snapshot context, plus the header version */
2978
2979         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2980         if (ret)
2981                 goto out_err;
2982         rbd_dev->header.obj_version = ver;
2983
2984         rbd_dev->image_format = 2;
2985
2986         dout("discovered version 2 image, header name is %s\n",
2987                 rbd_dev->header_name);
2988
2989         return 0;
2990 out_err:
2991         kfree(rbd_dev->header_name);
2992         rbd_dev->header_name = NULL;
2993         kfree(rbd_dev->header.object_prefix);
2994         rbd_dev->header.object_prefix = NULL;
2995
2996         return ret;
2997 }
2998
2999 /*
3000  * Probe for the existence of the header object for the given rbd
3001  * device.  For format 2 images this includes determining the image
3002  * id.
3003  */
3004 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3005 {
3006         int ret;
3007
3008         /*
3009          * Get the id from the image id object.  If it's not a
3010          * format 2 image, we'll get ENOENT back, and we'll assume
3011          * it's a format 1 image.
3012          */
3013         ret = rbd_dev_image_id(rbd_dev);
3014         if (ret)
3015                 ret = rbd_dev_v1_probe(rbd_dev);
3016         else
3017                 ret = rbd_dev_v2_probe(rbd_dev);
3018         if (ret)
3019                 dout("probe failed, returning %d\n", ret);
3020
3021         return ret;
3022 }
3023
3024 static ssize_t rbd_add(struct bus_type *bus,
3025                        const char *buf,
3026                        size_t count)
3027 {
3028         char *options;
3029         struct rbd_device *rbd_dev = NULL;
3030         const char *mon_addrs = NULL;
3031         size_t mon_addrs_size = 0;
3032         struct ceph_osd_client *osdc;
3033         int rc = -ENOMEM;
3034         char *snap_name;
3035
3036         if (!try_module_get(THIS_MODULE))
3037                 return -ENODEV;
3038
3039         options = kmalloc(count, GFP_KERNEL);
3040         if (!options)
3041                 goto err_out_mem;
3042         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3043         if (!rbd_dev)
3044                 goto err_out_mem;
3045
3046         /* static rbd_device initialization */
3047         spin_lock_init(&rbd_dev->lock);
3048         INIT_LIST_HEAD(&rbd_dev->node);
3049         INIT_LIST_HEAD(&rbd_dev->snaps);
3050         init_rwsem(&rbd_dev->header_rwsem);
3051
3052         /* parse add command */
3053         snap_name = rbd_add_parse_args(rbd_dev, buf,
3054                                 &mon_addrs, &mon_addrs_size, options, count);
3055         if (IS_ERR(snap_name)) {
3056                 rc = PTR_ERR(snap_name);
3057                 goto err_out_mem;
3058         }
3059
3060         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3061         if (rc < 0)
3062                 goto err_out_args;
3063
3064         /* pick the pool */
3065         osdc = &rbd_dev->rbd_client->client->osdc;
3066         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3067         if (rc < 0)
3068                 goto err_out_client;
3069         rbd_dev->pool_id = rc;
3070
3071         rc = rbd_dev_probe(rbd_dev);
3072         if (rc < 0)
3073                 goto err_out_client;
3074
3075         /* no need to lock here, as rbd_dev is not registered yet */
3076         rc = rbd_dev_snaps_update(rbd_dev);
3077         if (rc)
3078                 goto err_out_header;
3079
3080         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3081         if (rc)
3082                 goto err_out_header;
3083
3084         /* generate unique id: find highest unique id, add one */
3085         rbd_dev_id_get(rbd_dev);
3086
3087         /* Fill in the device name, now that we have its id. */
3088         BUILD_BUG_ON(DEV_NAME_LEN
3089                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3090         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3091
3092         /* Get our block major device number. */
3093
3094         rc = register_blkdev(0, rbd_dev->name);
3095         if (rc < 0)
3096                 goto err_out_id;
3097         rbd_dev->major = rc;
3098
3099         /* Set up the blkdev mapping. */
3100
3101         rc = rbd_init_disk(rbd_dev);
3102         if (rc)
3103                 goto err_out_blkdev;
3104
3105         rc = rbd_bus_add_dev(rbd_dev);
3106         if (rc)
3107                 goto err_out_disk;
3108
3109         /*
3110          * At this point cleanup in the event of an error is the job
3111          * of the sysfs code (initiated by rbd_bus_del_dev()).
3112          */
3113
3114         down_write(&rbd_dev->header_rwsem);
3115         rc = rbd_dev_snaps_register(rbd_dev);
3116         up_write(&rbd_dev->header_rwsem);
3117         if (rc)
3118                 goto err_out_bus;
3119
3120         rc = rbd_init_watch_dev(rbd_dev);
3121         if (rc)
3122                 goto err_out_bus;
3123
3124         /* Everything's ready.  Announce the disk to the world. */
3125
3126         add_disk(rbd_dev->disk);
3127
3128         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3129                 (unsigned long long) rbd_dev->mapping.size);
3130
3131         return count;
3132
3133 err_out_bus:
3134         /* this will also clean up rest of rbd_dev stuff */
3135
3136         rbd_bus_del_dev(rbd_dev);
3137         kfree(options);
3138         return rc;
3139
3140 err_out_disk:
3141         rbd_free_disk(rbd_dev);
3142 err_out_blkdev:
3143         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3144 err_out_id:
3145         rbd_dev_id_put(rbd_dev);
3146 err_out_header:
3147         rbd_header_free(&rbd_dev->header);
3148 err_out_client:
3149         kfree(rbd_dev->header_name);
3150         rbd_put_client(rbd_dev);
3151         kfree(rbd_dev->image_id);
3152 err_out_args:
3153         kfree(rbd_dev->mapping.snap_name);
3154         kfree(rbd_dev->image_name);
3155         kfree(rbd_dev->pool_name);
3156 err_out_mem:
3157         kfree(rbd_dev);
3158         kfree(options);
3159
3160         dout("Error adding device %s\n", buf);
3161         module_put(THIS_MODULE);
3162
3163         return (ssize_t) rc;
3164 }
3165
3166 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3167 {
3168         struct list_head *tmp;
3169         struct rbd_device *rbd_dev;
3170
3171         spin_lock(&rbd_dev_list_lock);
3172         list_for_each(tmp, &rbd_dev_list) {
3173                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3174                 if (rbd_dev->dev_id == dev_id) {
3175                         spin_unlock(&rbd_dev_list_lock);
3176                         return rbd_dev;
3177                 }
3178         }
3179         spin_unlock(&rbd_dev_list_lock);
3180         return NULL;
3181 }
3182
3183 static void rbd_dev_release(struct device *dev)
3184 {
3185         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3186
3187         if (rbd_dev->watch_request) {
3188                 struct ceph_client *client = rbd_dev->rbd_client->client;
3189
3190                 ceph_osdc_unregister_linger_request(&client->osdc,
3191                                                     rbd_dev->watch_request);
3192         }
3193         if (rbd_dev->watch_event)
3194                 rbd_req_sync_unwatch(rbd_dev);
3195
3196         rbd_put_client(rbd_dev);
3197
3198         /* clean up and free blkdev */
3199         rbd_free_disk(rbd_dev);
3200         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3201
3202         /* release allocated disk header fields */
3203         rbd_header_free(&rbd_dev->header);
3204
3205         /* done with the id, and with the rbd_dev */
3206         kfree(rbd_dev->mapping.snap_name);
3207         kfree(rbd_dev->image_id);
3208         kfree(rbd_dev->header_name);
3209         kfree(rbd_dev->pool_name);
3210         kfree(rbd_dev->image_name);
3211         rbd_dev_id_put(rbd_dev);
3212         kfree(rbd_dev);
3213
3214         /* release module ref */
3215         module_put(THIS_MODULE);
3216 }
3217
3218 static ssize_t rbd_remove(struct bus_type *bus,
3219                           const char *buf,
3220                           size_t count)
3221 {
3222         struct rbd_device *rbd_dev = NULL;
3223         int target_id, rc;
3224         unsigned long ul;
3225         int ret = count;
3226
3227         rc = strict_strtoul(buf, 10, &ul);
3228         if (rc)
3229                 return rc;
3230
3231         /* convert to int; abort if we lost anything in the conversion */
3232         target_id = (int) ul;
3233         if (target_id != ul)
3234                 return -EINVAL;
3235
3236         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3237
3238         rbd_dev = __rbd_get_dev(target_id);
3239         if (!rbd_dev) {
3240                 ret = -ENOENT;
3241                 goto done;
3242         }
3243
3244         __rbd_remove_all_snaps(rbd_dev);
3245         rbd_bus_del_dev(rbd_dev);
3246
3247 done:
3248         mutex_unlock(&ctl_mutex);
3249
3250         return ret;
3251 }
3252
3253 /*
3254  * create control files in sysfs
3255  * /sys/bus/rbd/...
3256  */
3257 static int rbd_sysfs_init(void)
3258 {
3259         int ret;
3260
3261         ret = device_register(&rbd_root_dev);
3262         if (ret < 0)
3263                 return ret;
3264
3265         ret = bus_register(&rbd_bus_type);
3266         if (ret < 0)
3267                 device_unregister(&rbd_root_dev);
3268
3269         return ret;
3270 }
3271
3272 static void rbd_sysfs_cleanup(void)
3273 {
3274         bus_unregister(&rbd_bus_type);
3275         device_unregister(&rbd_root_dev);
3276 }
3277
3278 int __init rbd_init(void)
3279 {
3280         int rc;
3281
3282         rc = rbd_sysfs_init();
3283         if (rc)
3284                 return rc;
3285         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3286         return 0;
3287 }
3288
3289 void __exit rbd_exit(void)
3290 {
3291         rbd_sysfs_cleanup();
3292 }
3293
3294 module_init(rbd_init);
3295 module_exit(rbd_exit);
3296
3297 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3298 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3299 MODULE_DESCRIPTION("rados block device");
3300
3301 /* following authorship retained from original osdblk.c */
3302 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3303
3304 MODULE_LICENSE("GPL");