]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
68447d83288cc60f6d5276eb6664adfa2ac46f1f
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 struct rbd_options {
116         bool    read_only;
117 };
118
119 /*
120  * an instance of the client.  multiple devices may share an rbd client.
121  */
122 struct rbd_client {
123         struct ceph_client      *client;
124         struct kref             kref;
125         struct list_head        node;
126 };
127
128 /*
129  * a request completion status
130  */
131 struct rbd_req_status {
132         int done;
133         int rc;
134         u64 bytes;
135 };
136
137 /*
138  * a collection of requests
139  */
140 struct rbd_req_coll {
141         int                     total;
142         int                     num_done;
143         struct kref             kref;
144         struct rbd_req_status   status[0];
145 };
146
147 /*
148  * a single io request
149  */
150 struct rbd_request {
151         struct request          *rq;            /* blk layer request */
152         struct bio              *bio;           /* cloned bio */
153         struct page             **pages;        /* list of used pages */
154         u64                     len;
155         int                     coll_index;
156         struct rbd_req_coll     *coll;
157 };
158
159 struct rbd_snap {
160         struct  device          dev;
161         const char              *name;
162         u64                     size;
163         struct list_head        node;
164         u64                     id;
165         u64                     features;
166 };
167
168 struct rbd_mapping {
169         u64                     size;
170         u64                     features;
171         bool                    read_only;
172 };
173
174 /*
175  * a single device
176  */
177 struct rbd_device {
178         int                     dev_id;         /* blkdev unique id */
179
180         int                     major;          /* blkdev assigned major */
181         struct gendisk          *disk;          /* blkdev's gendisk and rq */
182
183         u32                     image_format;   /* Either 1 or 2 */
184         struct rbd_client       *rbd_client;
185
186         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
187
188         spinlock_t              lock;           /* queue lock */
189
190         struct rbd_image_header header;
191         bool                    exists;
192         char                    *image_id;
193         size_t                  image_id_len;
194         char                    *image_name;
195         size_t                  image_name_len;
196         char                    *header_name;
197         char                    *pool_name;
198         u64                     pool_id;
199
200         char                    *snap_name;
201         u64                     snap_id;
202
203         struct ceph_osd_event   *watch_event;
204         struct ceph_osd_request *watch_request;
205
206         /* protects updating the header */
207         struct rw_semaphore     header_rwsem;
208
209         struct rbd_mapping      mapping;
210
211         struct list_head        node;
212
213         /* list of snapshots */
214         struct list_head        snaps;
215
216         /* sysfs related */
217         struct device           dev;
218 };
219
220 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
221
222 static LIST_HEAD(rbd_dev_list);    /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
224
225 static LIST_HEAD(rbd_client_list);              /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
227
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
230
231 static void rbd_dev_release(struct device *dev);
232 static void rbd_remove_snap_dev(struct rbd_snap *snap);
233
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
235                        size_t count);
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
237                           size_t count);
238
239 static struct bus_attribute rbd_bus_attrs[] = {
240         __ATTR(add, S_IWUSR, NULL, rbd_add),
241         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
242         __ATTR_NULL
243 };
244
245 static struct bus_type rbd_bus_type = {
246         .name           = "rbd",
247         .bus_attrs      = rbd_bus_attrs,
248 };
249
250 static void rbd_root_dev_release(struct device *dev)
251 {
252 }
253
254 static struct device rbd_root_dev = {
255         .init_name =    "rbd",
256         .release =      rbd_root_dev_release,
257 };
258
259 #ifdef RBD_DEBUG
260 #define rbd_assert(expr)                                                \
261                 if (unlikely(!(expr))) {                                \
262                         printk(KERN_ERR "\nAssertion failure in %s() "  \
263                                                 "at line %d:\n\n"       \
264                                         "\trbd_assert(%s);\n\n",        \
265                                         __func__, __LINE__, #expr);     \
266                         BUG();                                          \
267                 }
268 #else /* !RBD_DEBUG */
269 #  define rbd_assert(expr)      ((void) 0)
270 #endif /* !RBD_DEBUG */
271
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
273 {
274         return get_device(&rbd_dev->dev);
275 }
276
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
278 {
279         put_device(&rbd_dev->dev);
280 }
281
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
284
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
286 {
287         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
288
289         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
290                 return -EROFS;
291
292         rbd_get_dev(rbd_dev);
293         set_device_ro(bdev, rbd_dev->mapping.read_only);
294
295         return 0;
296 }
297
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
299 {
300         struct rbd_device *rbd_dev = disk->private_data;
301
302         rbd_put_dev(rbd_dev);
303
304         return 0;
305 }
306
307 static const struct block_device_operations rbd_bd_ops = {
308         .owner                  = THIS_MODULE,
309         .open                   = rbd_open,
310         .release                = rbd_release,
311 };
312
313 /*
314  * Initialize an rbd client instance.
315  * We own *ceph_opts.
316  */
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
318 {
319         struct rbd_client *rbdc;
320         int ret = -ENOMEM;
321
322         dout("rbd_client_create\n");
323         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
324         if (!rbdc)
325                 goto out_opt;
326
327         kref_init(&rbdc->kref);
328         INIT_LIST_HEAD(&rbdc->node);
329
330         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
331
332         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333         if (IS_ERR(rbdc->client))
334                 goto out_mutex;
335         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
336
337         ret = ceph_open_session(rbdc->client);
338         if (ret < 0)
339                 goto out_err;
340
341         spin_lock(&rbd_client_list_lock);
342         list_add_tail(&rbdc->node, &rbd_client_list);
343         spin_unlock(&rbd_client_list_lock);
344
345         mutex_unlock(&ctl_mutex);
346
347         dout("rbd_client_create created %p\n", rbdc);
348         return rbdc;
349
350 out_err:
351         ceph_destroy_client(rbdc->client);
352 out_mutex:
353         mutex_unlock(&ctl_mutex);
354         kfree(rbdc);
355 out_opt:
356         if (ceph_opts)
357                 ceph_destroy_options(ceph_opts);
358         return ERR_PTR(ret);
359 }
360
361 /*
362  * Find a ceph client with specific addr and configuration.  If
363  * found, bump its reference count.
364  */
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
366 {
367         struct rbd_client *client_node;
368         bool found = false;
369
370         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
371                 return NULL;
372
373         spin_lock(&rbd_client_list_lock);
374         list_for_each_entry(client_node, &rbd_client_list, node) {
375                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376                         kref_get(&client_node->kref);
377                         found = true;
378                         break;
379                 }
380         }
381         spin_unlock(&rbd_client_list_lock);
382
383         return found ? client_node : NULL;
384 }
385
386 /*
387  * mount options
388  */
389 enum {
390         Opt_last_int,
391         /* int args above */
392         Opt_last_string,
393         /* string args above */
394         Opt_read_only,
395         Opt_read_write,
396         /* Boolean args above */
397         Opt_last_bool,
398 };
399
400 static match_table_t rbd_opts_tokens = {
401         /* int args above */
402         /* string args above */
403         {Opt_read_only, "read_only"},
404         {Opt_read_only, "ro"},          /* Alternate spelling */
405         {Opt_read_write, "read_write"},
406         {Opt_read_write, "rw"},         /* Alternate spelling */
407         /* Boolean args above */
408         {-1, NULL}
409 };
410
411 static int parse_rbd_opts_token(char *c, void *private)
412 {
413         struct rbd_options *rbd_opts = private;
414         substring_t argstr[MAX_OPT_ARGS];
415         int token, intval, ret;
416
417         token = match_token(c, rbd_opts_tokens, argstr);
418         if (token < 0)
419                 return -EINVAL;
420
421         if (token < Opt_last_int) {
422                 ret = match_int(&argstr[0], &intval);
423                 if (ret < 0) {
424                         pr_err("bad mount option arg (not int) "
425                                "at '%s'\n", c);
426                         return ret;
427                 }
428                 dout("got int token %d val %d\n", token, intval);
429         } else if (token > Opt_last_int && token < Opt_last_string) {
430                 dout("got string token %d val %s\n", token,
431                      argstr[0].from);
432         } else if (token > Opt_last_string && token < Opt_last_bool) {
433                 dout("got Boolean token %d\n", token);
434         } else {
435                 dout("got token %d\n", token);
436         }
437
438         switch (token) {
439         case Opt_read_only:
440                 rbd_opts->read_only = true;
441                 break;
442         case Opt_read_write:
443                 rbd_opts->read_only = false;
444                 break;
445         default:
446                 rbd_assert(false);
447                 break;
448         }
449         return 0;
450 }
451
452 /*
453  * Get a ceph client with specific addr and configuration, if one does
454  * not exist create it.
455  */
456 static int rbd_get_client(struct rbd_device *rbd_dev,
457                                 struct ceph_options *ceph_opts)
458 {
459         struct rbd_client *rbdc;
460
461         rbdc = rbd_client_find(ceph_opts);
462         if (rbdc) {
463                 /* using an existing client */
464                 ceph_destroy_options(ceph_opts);
465         } else {
466                 rbdc = rbd_client_create(ceph_opts);
467                 if (IS_ERR(rbdc))
468                         return PTR_ERR(rbdc);
469         }
470         rbd_dev->rbd_client = rbdc;
471
472         return 0;
473 }
474
475 /*
476  * Destroy ceph client
477  *
478  * Caller must hold rbd_client_list_lock.
479  */
480 static void rbd_client_release(struct kref *kref)
481 {
482         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
483
484         dout("rbd_release_client %p\n", rbdc);
485         spin_lock(&rbd_client_list_lock);
486         list_del(&rbdc->node);
487         spin_unlock(&rbd_client_list_lock);
488
489         ceph_destroy_client(rbdc->client);
490         kfree(rbdc);
491 }
492
493 /*
494  * Drop reference to ceph client node. If it's not referenced anymore, release
495  * it.
496  */
497 static void rbd_put_client(struct rbd_device *rbd_dev)
498 {
499         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
500         rbd_dev->rbd_client = NULL;
501 }
502
503 /*
504  * Destroy requests collection
505  */
506 static void rbd_coll_release(struct kref *kref)
507 {
508         struct rbd_req_coll *coll =
509                 container_of(kref, struct rbd_req_coll, kref);
510
511         dout("rbd_coll_release %p\n", coll);
512         kfree(coll);
513 }
514
515 static bool rbd_image_format_valid(u32 image_format)
516 {
517         return image_format == 1 || image_format == 2;
518 }
519
520 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
521 {
522         size_t size;
523         u32 snap_count;
524
525         /* The header has to start with the magic rbd header text */
526         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
527                 return false;
528
529         /* The bio layer requires at least sector-sized I/O */
530
531         if (ondisk->options.order < SECTOR_SHIFT)
532                 return false;
533
534         /* If we use u64 in a few spots we may be able to loosen this */
535
536         if (ondisk->options.order > 8 * sizeof (int) - 1)
537                 return false;
538
539         /*
540          * The size of a snapshot header has to fit in a size_t, and
541          * that limits the number of snapshots.
542          */
543         snap_count = le32_to_cpu(ondisk->snap_count);
544         size = SIZE_MAX - sizeof (struct ceph_snap_context);
545         if (snap_count > size / sizeof (__le64))
546                 return false;
547
548         /*
549          * Not only that, but the size of the entire the snapshot
550          * header must also be representable in a size_t.
551          */
552         size -= snap_count * sizeof (__le64);
553         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
554                 return false;
555
556         return true;
557 }
558
559 /*
560  * Create a new header structure, translate header format from the on-disk
561  * header.
562  */
563 static int rbd_header_from_disk(struct rbd_image_header *header,
564                                  struct rbd_image_header_ondisk *ondisk)
565 {
566         u32 snap_count;
567         size_t len;
568         size_t size;
569         u32 i;
570
571         memset(header, 0, sizeof (*header));
572
573         snap_count = le32_to_cpu(ondisk->snap_count);
574
575         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
576         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
577         if (!header->object_prefix)
578                 return -ENOMEM;
579         memcpy(header->object_prefix, ondisk->object_prefix, len);
580         header->object_prefix[len] = '\0';
581
582         if (snap_count) {
583                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
584
585                 /* Save a copy of the snapshot names */
586
587                 if (snap_names_len > (u64) SIZE_MAX)
588                         return -EIO;
589                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
590                 if (!header->snap_names)
591                         goto out_err;
592                 /*
593                  * Note that rbd_dev_v1_header_read() guarantees
594                  * the ondisk buffer we're working with has
595                  * snap_names_len bytes beyond the end of the
596                  * snapshot id array, this memcpy() is safe.
597                  */
598                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
599                         snap_names_len);
600
601                 /* Record each snapshot's size */
602
603                 size = snap_count * sizeof (*header->snap_sizes);
604                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
605                 if (!header->snap_sizes)
606                         goto out_err;
607                 for (i = 0; i < snap_count; i++)
608                         header->snap_sizes[i] =
609                                 le64_to_cpu(ondisk->snaps[i].image_size);
610         } else {
611                 WARN_ON(ondisk->snap_names_len);
612                 header->snap_names = NULL;
613                 header->snap_sizes = NULL;
614         }
615
616         header->features = 0;   /* No features support in v1 images */
617         header->obj_order = ondisk->options.order;
618         header->crypt_type = ondisk->options.crypt_type;
619         header->comp_type = ondisk->options.comp_type;
620
621         /* Allocate and fill in the snapshot context */
622
623         header->image_size = le64_to_cpu(ondisk->image_size);
624         size = sizeof (struct ceph_snap_context);
625         size += snap_count * sizeof (header->snapc->snaps[0]);
626         header->snapc = kzalloc(size, GFP_KERNEL);
627         if (!header->snapc)
628                 goto out_err;
629
630         atomic_set(&header->snapc->nref, 1);
631         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
632         header->snapc->num_snaps = snap_count;
633         for (i = 0; i < snap_count; i++)
634                 header->snapc->snaps[i] =
635                         le64_to_cpu(ondisk->snaps[i].id);
636
637         return 0;
638
639 out_err:
640         kfree(header->snap_sizes);
641         header->snap_sizes = NULL;
642         kfree(header->snap_names);
643         header->snap_names = NULL;
644         kfree(header->object_prefix);
645         header->object_prefix = NULL;
646
647         return -ENOMEM;
648 }
649
650 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
651 {
652
653         struct rbd_snap *snap;
654
655         list_for_each_entry(snap, &rbd_dev->snaps, node) {
656                 if (!strcmp(snap_name, snap->name)) {
657                         rbd_dev->snap_id = snap->id;
658                         rbd_dev->mapping.size = snap->size;
659                         rbd_dev->mapping.features = snap->features;
660
661                         return 0;
662                 }
663         }
664
665         return -ENOENT;
666 }
667
668 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
669 {
670         int ret;
671
672         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
673                     sizeof (RBD_SNAP_HEAD_NAME))) {
674                 rbd_dev->snap_id = CEPH_NOSNAP;
675                 rbd_dev->mapping.size = rbd_dev->header.image_size;
676                 rbd_dev->mapping.features = rbd_dev->header.features;
677                 ret = 0;
678         } else {
679                 ret = snap_by_name(rbd_dev, snap_name);
680                 if (ret < 0)
681                         goto done;
682                 rbd_dev->mapping.read_only = true;
683         }
684         rbd_dev->snap_name = snap_name;
685         rbd_dev->exists = true;
686 done:
687         return ret;
688 }
689
690 static void rbd_header_free(struct rbd_image_header *header)
691 {
692         kfree(header->object_prefix);
693         header->object_prefix = NULL;
694         kfree(header->snap_sizes);
695         header->snap_sizes = NULL;
696         kfree(header->snap_names);
697         header->snap_names = NULL;
698         ceph_put_snap_context(header->snapc);
699         header->snapc = NULL;
700 }
701
702 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
703 {
704         char *name;
705         u64 segment;
706         int ret;
707
708         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
709         if (!name)
710                 return NULL;
711         segment = offset >> rbd_dev->header.obj_order;
712         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
713                         rbd_dev->header.object_prefix, segment);
714         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
715                 pr_err("error formatting segment name for #%llu (%d)\n",
716                         segment, ret);
717                 kfree(name);
718                 name = NULL;
719         }
720
721         return name;
722 }
723
724 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
725 {
726         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
727
728         return offset & (segment_size - 1);
729 }
730
731 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
732                                 u64 offset, u64 length)
733 {
734         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
735
736         offset &= segment_size - 1;
737
738         rbd_assert(length <= U64_MAX - offset);
739         if (offset + length > segment_size)
740                 length = segment_size - offset;
741
742         return length;
743 }
744
745 static int rbd_get_num_segments(struct rbd_image_header *header,
746                                 u64 ofs, u64 len)
747 {
748         u64 start_seg;
749         u64 end_seg;
750
751         if (!len)
752                 return 0;
753         if (len - 1 > U64_MAX - ofs)
754                 return -ERANGE;
755
756         start_seg = ofs >> header->obj_order;
757         end_seg = (ofs + len - 1) >> header->obj_order;
758
759         return end_seg - start_seg + 1;
760 }
761
762 /*
763  * returns the size of an object in the image
764  */
765 static u64 rbd_obj_bytes(struct rbd_image_header *header)
766 {
767         return 1 << header->obj_order;
768 }
769
770 /*
771  * bio helpers
772  */
773
774 static void bio_chain_put(struct bio *chain)
775 {
776         struct bio *tmp;
777
778         while (chain) {
779                 tmp = chain;
780                 chain = chain->bi_next;
781                 bio_put(tmp);
782         }
783 }
784
785 /*
786  * zeros a bio chain, starting at specific offset
787  */
788 static void zero_bio_chain(struct bio *chain, int start_ofs)
789 {
790         struct bio_vec *bv;
791         unsigned long flags;
792         void *buf;
793         int i;
794         int pos = 0;
795
796         while (chain) {
797                 bio_for_each_segment(bv, chain, i) {
798                         if (pos + bv->bv_len > start_ofs) {
799                                 int remainder = max(start_ofs - pos, 0);
800                                 buf = bvec_kmap_irq(bv, &flags);
801                                 memset(buf + remainder, 0,
802                                        bv->bv_len - remainder);
803                                 bvec_kunmap_irq(buf, &flags);
804                         }
805                         pos += bv->bv_len;
806                 }
807
808                 chain = chain->bi_next;
809         }
810 }
811
812 /*
813  * Clone a portion of a bio, starting at the given byte offset
814  * and continuing for the number of bytes indicated.
815  */
816 static struct bio *bio_clone_range(struct bio *bio_src,
817                                         unsigned int offset,
818                                         unsigned int len,
819                                         gfp_t gfpmask)
820 {
821         struct bio_vec *bv;
822         unsigned int resid;
823         unsigned short idx;
824         unsigned int voff;
825         unsigned short end_idx;
826         unsigned short vcnt;
827         struct bio *bio;
828
829         /* Handle the easy case for the caller */
830
831         if (!offset && len == bio_src->bi_size)
832                 return bio_clone(bio_src, gfpmask);
833
834         if (WARN_ON_ONCE(!len))
835                 return NULL;
836         if (WARN_ON_ONCE(len > bio_src->bi_size))
837                 return NULL;
838         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
839                 return NULL;
840
841         /* Find first affected segment... */
842
843         resid = offset;
844         __bio_for_each_segment(bv, bio_src, idx, 0) {
845                 if (resid < bv->bv_len)
846                         break;
847                 resid -= bv->bv_len;
848         }
849         voff = resid;
850
851         /* ...and the last affected segment */
852
853         resid += len;
854         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
855                 if (resid <= bv->bv_len)
856                         break;
857                 resid -= bv->bv_len;
858         }
859         vcnt = end_idx - idx + 1;
860
861         /* Build the clone */
862
863         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
864         if (!bio)
865                 return NULL;    /* ENOMEM */
866
867         bio->bi_bdev = bio_src->bi_bdev;
868         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
869         bio->bi_rw = bio_src->bi_rw;
870         bio->bi_flags |= 1 << BIO_CLONED;
871
872         /*
873          * Copy over our part of the bio_vec, then update the first
874          * and last (or only) entries.
875          */
876         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
877                         vcnt * sizeof (struct bio_vec));
878         bio->bi_io_vec[0].bv_offset += voff;
879         if (vcnt > 1) {
880                 bio->bi_io_vec[0].bv_len -= voff;
881                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
882         } else {
883                 bio->bi_io_vec[0].bv_len = len;
884         }
885
886         bio->bi_vcnt = vcnt;
887         bio->bi_size = len;
888         bio->bi_idx = 0;
889
890         return bio;
891 }
892
893 /*
894  * Clone a portion of a bio chain, starting at the given byte offset
895  * into the first bio in the source chain and continuing for the
896  * number of bytes indicated.  The result is another bio chain of
897  * exactly the given length, or a null pointer on error.
898  *
899  * The bio_src and offset parameters are both in-out.  On entry they
900  * refer to the first source bio and the offset into that bio where
901  * the start of data to be cloned is located.
902  *
903  * On return, bio_src is updated to refer to the bio in the source
904  * chain that contains first un-cloned byte, and *offset will
905  * contain the offset of that byte within that bio.
906  */
907 static struct bio *bio_chain_clone_range(struct bio **bio_src,
908                                         unsigned int *offset,
909                                         unsigned int len,
910                                         gfp_t gfpmask)
911 {
912         struct bio *bi = *bio_src;
913         unsigned int off = *offset;
914         struct bio *chain = NULL;
915         struct bio **end;
916
917         /* Build up a chain of clone bios up to the limit */
918
919         if (!bi || off >= bi->bi_size || !len)
920                 return NULL;            /* Nothing to clone */
921
922         end = &chain;
923         while (len) {
924                 unsigned int bi_size;
925                 struct bio *bio;
926
927                 if (!bi)
928                         goto out_err;   /* EINVAL; ran out of bio's */
929                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
930                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
931                 if (!bio)
932                         goto out_err;   /* ENOMEM */
933
934                 *end = bio;
935                 end = &bio->bi_next;
936
937                 off += bi_size;
938                 if (off == bi->bi_size) {
939                         bi = bi->bi_next;
940                         off = 0;
941                 }
942                 len -= bi_size;
943         }
944         *bio_src = bi;
945         *offset = off;
946
947         return chain;
948 out_err:
949         bio_chain_put(chain);
950
951         return NULL;
952 }
953
954 /*
955  * helpers for osd request op vectors.
956  */
957 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
958                                         int opcode, u32 payload_len)
959 {
960         struct ceph_osd_req_op *ops;
961
962         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
963         if (!ops)
964                 return NULL;
965
966         ops[0].op = opcode;
967
968         /*
969          * op extent offset and length will be set later on
970          * in calc_raw_layout()
971          */
972         ops[0].payload_len = payload_len;
973
974         return ops;
975 }
976
977 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
978 {
979         kfree(ops);
980 }
981
982 static void rbd_coll_end_req_index(struct request *rq,
983                                    struct rbd_req_coll *coll,
984                                    int index,
985                                    int ret, u64 len)
986 {
987         struct request_queue *q;
988         int min, max, i;
989
990         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
991              coll, index, ret, (unsigned long long) len);
992
993         if (!rq)
994                 return;
995
996         if (!coll) {
997                 blk_end_request(rq, ret, len);
998                 return;
999         }
1000
1001         q = rq->q;
1002
1003         spin_lock_irq(q->queue_lock);
1004         coll->status[index].done = 1;
1005         coll->status[index].rc = ret;
1006         coll->status[index].bytes = len;
1007         max = min = coll->num_done;
1008         while (max < coll->total && coll->status[max].done)
1009                 max++;
1010
1011         for (i = min; i<max; i++) {
1012                 __blk_end_request(rq, coll->status[i].rc,
1013                                   coll->status[i].bytes);
1014                 coll->num_done++;
1015                 kref_put(&coll->kref, rbd_coll_release);
1016         }
1017         spin_unlock_irq(q->queue_lock);
1018 }
1019
1020 static void rbd_coll_end_req(struct rbd_request *req,
1021                              int ret, u64 len)
1022 {
1023         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1024 }
1025
1026 /*
1027  * Send ceph osd request
1028  */
1029 static int rbd_do_request(struct request *rq,
1030                           struct rbd_device *rbd_dev,
1031                           struct ceph_snap_context *snapc,
1032                           u64 snapid,
1033                           const char *object_name, u64 ofs, u64 len,
1034                           struct bio *bio,
1035                           struct page **pages,
1036                           int num_pages,
1037                           int flags,
1038                           struct ceph_osd_req_op *ops,
1039                           struct rbd_req_coll *coll,
1040                           int coll_index,
1041                           void (*rbd_cb)(struct ceph_osd_request *req,
1042                                          struct ceph_msg *msg),
1043                           struct ceph_osd_request **linger_req,
1044                           u64 *ver)
1045 {
1046         struct ceph_osd_request *req;
1047         struct ceph_file_layout *layout;
1048         int ret;
1049         u64 bno;
1050         struct timespec mtime = CURRENT_TIME;
1051         struct rbd_request *req_data;
1052         struct ceph_osd_request_head *reqhead;
1053         struct ceph_osd_client *osdc;
1054
1055         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1056         if (!req_data) {
1057                 if (coll)
1058                         rbd_coll_end_req_index(rq, coll, coll_index,
1059                                                -ENOMEM, len);
1060                 return -ENOMEM;
1061         }
1062
1063         if (coll) {
1064                 req_data->coll = coll;
1065                 req_data->coll_index = coll_index;
1066         }
1067
1068         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1069                 object_name, (unsigned long long) ofs,
1070                 (unsigned long long) len, coll, coll_index);
1071
1072         osdc = &rbd_dev->rbd_client->client->osdc;
1073         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1074                                         false, GFP_NOIO, pages, bio);
1075         if (!req) {
1076                 ret = -ENOMEM;
1077                 goto done_pages;
1078         }
1079
1080         req->r_callback = rbd_cb;
1081
1082         req_data->rq = rq;
1083         req_data->bio = bio;
1084         req_data->pages = pages;
1085         req_data->len = len;
1086
1087         req->r_priv = req_data;
1088
1089         reqhead = req->r_request->front.iov_base;
1090         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1091
1092         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1093         req->r_oid_len = strlen(req->r_oid);
1094
1095         layout = &req->r_file_layout;
1096         memset(layout, 0, sizeof(*layout));
1097         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1098         layout->fl_stripe_count = cpu_to_le32(1);
1099         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1100         layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->pool_id);
1101         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1102                                    req, ops);
1103         rbd_assert(ret == 0);
1104
1105         ceph_osdc_build_request(req, ofs, &len,
1106                                 ops,
1107                                 snapc,
1108                                 &mtime,
1109                                 req->r_oid, req->r_oid_len);
1110
1111         if (linger_req) {
1112                 ceph_osdc_set_request_linger(osdc, req);
1113                 *linger_req = req;
1114         }
1115
1116         ret = ceph_osdc_start_request(osdc, req, false);
1117         if (ret < 0)
1118                 goto done_err;
1119
1120         if (!rbd_cb) {
1121                 ret = ceph_osdc_wait_request(osdc, req);
1122                 if (ver)
1123                         *ver = le64_to_cpu(req->r_reassert_version.version);
1124                 dout("reassert_ver=%llu\n",
1125                         (unsigned long long)
1126                                 le64_to_cpu(req->r_reassert_version.version));
1127                 ceph_osdc_put_request(req);
1128         }
1129         return ret;
1130
1131 done_err:
1132         bio_chain_put(req_data->bio);
1133         ceph_osdc_put_request(req);
1134 done_pages:
1135         rbd_coll_end_req(req_data, ret, len);
1136         kfree(req_data);
1137         return ret;
1138 }
1139
1140 /*
1141  * Ceph osd op callback
1142  */
1143 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1144 {
1145         struct rbd_request *req_data = req->r_priv;
1146         struct ceph_osd_reply_head *replyhead;
1147         struct ceph_osd_op *op;
1148         __s32 rc;
1149         u64 bytes;
1150         int read_op;
1151
1152         /* parse reply */
1153         replyhead = msg->front.iov_base;
1154         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1155         op = (void *)(replyhead + 1);
1156         rc = le32_to_cpu(replyhead->result);
1157         bytes = le64_to_cpu(op->extent.length);
1158         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1159
1160         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1161                 (unsigned long long) bytes, read_op, (int) rc);
1162
1163         if (rc == -ENOENT && read_op) {
1164                 zero_bio_chain(req_data->bio, 0);
1165                 rc = 0;
1166         } else if (rc == 0 && read_op && bytes < req_data->len) {
1167                 zero_bio_chain(req_data->bio, bytes);
1168                 bytes = req_data->len;
1169         }
1170
1171         rbd_coll_end_req(req_data, rc, bytes);
1172
1173         if (req_data->bio)
1174                 bio_chain_put(req_data->bio);
1175
1176         ceph_osdc_put_request(req);
1177         kfree(req_data);
1178 }
1179
1180 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1181 {
1182         ceph_osdc_put_request(req);
1183 }
1184
1185 /*
1186  * Do a synchronous ceph osd operation
1187  */
1188 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1189                            struct ceph_snap_context *snapc,
1190                            u64 snapid,
1191                            int flags,
1192                            struct ceph_osd_req_op *ops,
1193                            const char *object_name,
1194                            u64 ofs, u64 inbound_size,
1195                            char *inbound,
1196                            struct ceph_osd_request **linger_req,
1197                            u64 *ver)
1198 {
1199         int ret;
1200         struct page **pages;
1201         int num_pages;
1202
1203         rbd_assert(ops != NULL);
1204
1205         num_pages = calc_pages_for(ofs, inbound_size);
1206         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1207         if (IS_ERR(pages))
1208                 return PTR_ERR(pages);
1209
1210         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1211                           object_name, ofs, inbound_size, NULL,
1212                           pages, num_pages,
1213                           flags,
1214                           ops,
1215                           NULL, 0,
1216                           NULL,
1217                           linger_req, ver);
1218         if (ret < 0)
1219                 goto done;
1220
1221         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1222                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1223
1224 done:
1225         ceph_release_page_vector(pages, num_pages);
1226         return ret;
1227 }
1228
1229 /*
1230  * Do an asynchronous ceph osd operation
1231  */
1232 static int rbd_do_op(struct request *rq,
1233                      struct rbd_device *rbd_dev,
1234                      struct ceph_snap_context *snapc,
1235                      u64 ofs, u64 len,
1236                      struct bio *bio,
1237                      struct rbd_req_coll *coll,
1238                      int coll_index)
1239 {
1240         char *seg_name;
1241         u64 seg_ofs;
1242         u64 seg_len;
1243         int ret;
1244         struct ceph_osd_req_op *ops;
1245         u32 payload_len;
1246         int opcode;
1247         int flags;
1248         u64 snapid;
1249
1250         seg_name = rbd_segment_name(rbd_dev, ofs);
1251         if (!seg_name)
1252                 return -ENOMEM;
1253         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1254         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1255
1256         if (rq_data_dir(rq) == WRITE) {
1257                 opcode = CEPH_OSD_OP_WRITE;
1258                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1259                 snapid = CEPH_NOSNAP;
1260                 payload_len = seg_len;
1261         } else {
1262                 opcode = CEPH_OSD_OP_READ;
1263                 flags = CEPH_OSD_FLAG_READ;
1264                 snapc = NULL;
1265                 snapid = rbd_dev->snap_id;
1266                 payload_len = 0;
1267         }
1268
1269         ret = -ENOMEM;
1270         ops = rbd_create_rw_ops(1, opcode, payload_len);
1271         if (!ops)
1272                 goto done;
1273
1274         /* we've taken care of segment sizes earlier when we
1275            cloned the bios. We should never have a segment
1276            truncated at this point */
1277         rbd_assert(seg_len == len);
1278
1279         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1280                              seg_name, seg_ofs, seg_len,
1281                              bio,
1282                              NULL, 0,
1283                              flags,
1284                              ops,
1285                              coll, coll_index,
1286                              rbd_req_cb, 0, NULL);
1287
1288         rbd_destroy_ops(ops);
1289 done:
1290         kfree(seg_name);
1291         return ret;
1292 }
1293
1294 /*
1295  * Request sync osd read
1296  */
1297 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1298                           u64 snapid,
1299                           const char *object_name,
1300                           u64 ofs, u64 len,
1301                           char *buf,
1302                           u64 *ver)
1303 {
1304         struct ceph_osd_req_op *ops;
1305         int ret;
1306
1307         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1308         if (!ops)
1309                 return -ENOMEM;
1310
1311         ret = rbd_req_sync_op(rbd_dev, NULL,
1312                                snapid,
1313                                CEPH_OSD_FLAG_READ,
1314                                ops, object_name, ofs, len, buf, NULL, ver);
1315         rbd_destroy_ops(ops);
1316
1317         return ret;
1318 }
1319
1320 /*
1321  * Request sync osd watch
1322  */
1323 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1324                                    u64 ver,
1325                                    u64 notify_id)
1326 {
1327         struct ceph_osd_req_op *ops;
1328         int ret;
1329
1330         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1331         if (!ops)
1332                 return -ENOMEM;
1333
1334         ops[0].watch.ver = cpu_to_le64(ver);
1335         ops[0].watch.cookie = notify_id;
1336         ops[0].watch.flag = 0;
1337
1338         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1339                           rbd_dev->header_name, 0, 0, NULL,
1340                           NULL, 0,
1341                           CEPH_OSD_FLAG_READ,
1342                           ops,
1343                           NULL, 0,
1344                           rbd_simple_req_cb, 0, NULL);
1345
1346         rbd_destroy_ops(ops);
1347         return ret;
1348 }
1349
1350 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1351 {
1352         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1353         u64 hver;
1354         int rc;
1355
1356         if (!rbd_dev)
1357                 return;
1358
1359         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1360                 rbd_dev->header_name, (unsigned long long) notify_id,
1361                 (unsigned int) opcode);
1362         rc = rbd_dev_refresh(rbd_dev, &hver);
1363         if (rc)
1364                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1365                            " update snaps: %d\n", rbd_dev->major, rc);
1366
1367         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1368 }
1369
1370 /*
1371  * Request sync osd watch
1372  */
1373 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1374 {
1375         struct ceph_osd_req_op *ops;
1376         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1377         int ret;
1378
1379         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1380         if (!ops)
1381                 return -ENOMEM;
1382
1383         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1384                                      (void *)rbd_dev, &rbd_dev->watch_event);
1385         if (ret < 0)
1386                 goto fail;
1387
1388         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1389         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1390         ops[0].watch.flag = 1;
1391
1392         ret = rbd_req_sync_op(rbd_dev, NULL,
1393                               CEPH_NOSNAP,
1394                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1395                               ops,
1396                               rbd_dev->header_name,
1397                               0, 0, NULL,
1398                               &rbd_dev->watch_request, NULL);
1399
1400         if (ret < 0)
1401                 goto fail_event;
1402
1403         rbd_destroy_ops(ops);
1404         return 0;
1405
1406 fail_event:
1407         ceph_osdc_cancel_event(rbd_dev->watch_event);
1408         rbd_dev->watch_event = NULL;
1409 fail:
1410         rbd_destroy_ops(ops);
1411         return ret;
1412 }
1413
1414 /*
1415  * Request sync osd unwatch
1416  */
1417 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1418 {
1419         struct ceph_osd_req_op *ops;
1420         int ret;
1421
1422         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1423         if (!ops)
1424                 return -ENOMEM;
1425
1426         ops[0].watch.ver = 0;
1427         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1428         ops[0].watch.flag = 0;
1429
1430         ret = rbd_req_sync_op(rbd_dev, NULL,
1431                               CEPH_NOSNAP,
1432                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1433                               ops,
1434                               rbd_dev->header_name,
1435                               0, 0, NULL, NULL, NULL);
1436
1437
1438         rbd_destroy_ops(ops);
1439         ceph_osdc_cancel_event(rbd_dev->watch_event);
1440         rbd_dev->watch_event = NULL;
1441         return ret;
1442 }
1443
1444 /*
1445  * Synchronous osd object method call
1446  */
1447 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1448                              const char *object_name,
1449                              const char *class_name,
1450                              const char *method_name,
1451                              const char *outbound,
1452                              size_t outbound_size,
1453                              char *inbound,
1454                              size_t inbound_size,
1455                              int flags,
1456                              u64 *ver)
1457 {
1458         struct ceph_osd_req_op *ops;
1459         int class_name_len = strlen(class_name);
1460         int method_name_len = strlen(method_name);
1461         int payload_size;
1462         int ret;
1463
1464         /*
1465          * Any input parameters required by the method we're calling
1466          * will be sent along with the class and method names as
1467          * part of the message payload.  That data and its size are
1468          * supplied via the indata and indata_len fields (named from
1469          * the perspective of the server side) in the OSD request
1470          * operation.
1471          */
1472         payload_size = class_name_len + method_name_len + outbound_size;
1473         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1474         if (!ops)
1475                 return -ENOMEM;
1476
1477         ops[0].cls.class_name = class_name;
1478         ops[0].cls.class_len = (__u8) class_name_len;
1479         ops[0].cls.method_name = method_name;
1480         ops[0].cls.method_len = (__u8) method_name_len;
1481         ops[0].cls.argc = 0;
1482         ops[0].cls.indata = outbound;
1483         ops[0].cls.indata_len = outbound_size;
1484
1485         ret = rbd_req_sync_op(rbd_dev, NULL,
1486                                CEPH_NOSNAP,
1487                                flags, ops,
1488                                object_name, 0, inbound_size, inbound,
1489                                NULL, ver);
1490
1491         rbd_destroy_ops(ops);
1492
1493         dout("cls_exec returned %d\n", ret);
1494         return ret;
1495 }
1496
1497 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1498 {
1499         struct rbd_req_coll *coll =
1500                         kzalloc(sizeof(struct rbd_req_coll) +
1501                                 sizeof(struct rbd_req_status) * num_reqs,
1502                                 GFP_ATOMIC);
1503
1504         if (!coll)
1505                 return NULL;
1506         coll->total = num_reqs;
1507         kref_init(&coll->kref);
1508         return coll;
1509 }
1510
1511 /*
1512  * block device queue callback
1513  */
1514 static void rbd_rq_fn(struct request_queue *q)
1515 {
1516         struct rbd_device *rbd_dev = q->queuedata;
1517         struct request *rq;
1518
1519         while ((rq = blk_fetch_request(q))) {
1520                 struct bio *bio;
1521                 bool do_write;
1522                 unsigned int size;
1523                 u64 ofs;
1524                 int num_segs, cur_seg = 0;
1525                 struct rbd_req_coll *coll;
1526                 struct ceph_snap_context *snapc;
1527                 unsigned int bio_offset;
1528
1529                 dout("fetched request\n");
1530
1531                 /* filter out block requests we don't understand */
1532                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1533                         __blk_end_request_all(rq, 0);
1534                         continue;
1535                 }
1536
1537                 /* deduce our operation (read, write) */
1538                 do_write = (rq_data_dir(rq) == WRITE);
1539                 if (do_write && rbd_dev->mapping.read_only) {
1540                         __blk_end_request_all(rq, -EROFS);
1541                         continue;
1542                 }
1543
1544                 spin_unlock_irq(q->queue_lock);
1545
1546                 down_read(&rbd_dev->header_rwsem);
1547
1548                 if (!rbd_dev->exists) {
1549                         rbd_assert(rbd_dev->snap_id != CEPH_NOSNAP);
1550                         up_read(&rbd_dev->header_rwsem);
1551                         dout("request for non-existent snapshot");
1552                         spin_lock_irq(q->queue_lock);
1553                         __blk_end_request_all(rq, -ENXIO);
1554                         continue;
1555                 }
1556
1557                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1558
1559                 up_read(&rbd_dev->header_rwsem);
1560
1561                 size = blk_rq_bytes(rq);
1562                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1563                 bio = rq->bio;
1564
1565                 dout("%s 0x%x bytes at 0x%llx\n",
1566                      do_write ? "write" : "read",
1567                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1568
1569                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1570                 if (num_segs <= 0) {
1571                         spin_lock_irq(q->queue_lock);
1572                         __blk_end_request_all(rq, num_segs);
1573                         ceph_put_snap_context(snapc);
1574                         continue;
1575                 }
1576                 coll = rbd_alloc_coll(num_segs);
1577                 if (!coll) {
1578                         spin_lock_irq(q->queue_lock);
1579                         __blk_end_request_all(rq, -ENOMEM);
1580                         ceph_put_snap_context(snapc);
1581                         continue;
1582                 }
1583
1584                 bio_offset = 0;
1585                 do {
1586                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1587                         unsigned int chain_size;
1588                         struct bio *bio_chain;
1589
1590                         BUG_ON(limit > (u64) UINT_MAX);
1591                         chain_size = (unsigned int) limit;
1592                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1593
1594                         kref_get(&coll->kref);
1595
1596                         /* Pass a cloned bio chain via an osd request */
1597
1598                         bio_chain = bio_chain_clone_range(&bio,
1599                                                 &bio_offset, chain_size,
1600                                                 GFP_ATOMIC);
1601                         if (bio_chain)
1602                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1603                                                 ofs, chain_size,
1604                                                 bio_chain, coll, cur_seg);
1605                         else
1606                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1607                                                        -ENOMEM, chain_size);
1608                         size -= chain_size;
1609                         ofs += chain_size;
1610
1611                         cur_seg++;
1612                 } while (size > 0);
1613                 kref_put(&coll->kref, rbd_coll_release);
1614
1615                 spin_lock_irq(q->queue_lock);
1616
1617                 ceph_put_snap_context(snapc);
1618         }
1619 }
1620
1621 /*
1622  * a queue callback. Makes sure that we don't create a bio that spans across
1623  * multiple osd objects. One exception would be with a single page bios,
1624  * which we handle later at bio_chain_clone_range()
1625  */
1626 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1627                           struct bio_vec *bvec)
1628 {
1629         struct rbd_device *rbd_dev = q->queuedata;
1630         sector_t sector_offset;
1631         sector_t sectors_per_obj;
1632         sector_t obj_sector_offset;
1633         int ret;
1634
1635         /*
1636          * Find how far into its rbd object the partition-relative
1637          * bio start sector is to offset relative to the enclosing
1638          * device.
1639          */
1640         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1641         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1642         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1643
1644         /*
1645          * Compute the number of bytes from that offset to the end
1646          * of the object.  Account for what's already used by the bio.
1647          */
1648         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1649         if (ret > bmd->bi_size)
1650                 ret -= bmd->bi_size;
1651         else
1652                 ret = 0;
1653
1654         /*
1655          * Don't send back more than was asked for.  And if the bio
1656          * was empty, let the whole thing through because:  "Note
1657          * that a block device *must* allow a single page to be
1658          * added to an empty bio."
1659          */
1660         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1661         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1662                 ret = (int) bvec->bv_len;
1663
1664         return ret;
1665 }
1666
1667 static void rbd_free_disk(struct rbd_device *rbd_dev)
1668 {
1669         struct gendisk *disk = rbd_dev->disk;
1670
1671         if (!disk)
1672                 return;
1673
1674         if (disk->flags & GENHD_FL_UP)
1675                 del_gendisk(disk);
1676         if (disk->queue)
1677                 blk_cleanup_queue(disk->queue);
1678         put_disk(disk);
1679 }
1680
1681 /*
1682  * Read the complete header for the given rbd device.
1683  *
1684  * Returns a pointer to a dynamically-allocated buffer containing
1685  * the complete and validated header.  Caller can pass the address
1686  * of a variable that will be filled in with the version of the
1687  * header object at the time it was read.
1688  *
1689  * Returns a pointer-coded errno if a failure occurs.
1690  */
1691 static struct rbd_image_header_ondisk *
1692 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1693 {
1694         struct rbd_image_header_ondisk *ondisk = NULL;
1695         u32 snap_count = 0;
1696         u64 names_size = 0;
1697         u32 want_count;
1698         int ret;
1699
1700         /*
1701          * The complete header will include an array of its 64-bit
1702          * snapshot ids, followed by the names of those snapshots as
1703          * a contiguous block of NUL-terminated strings.  Note that
1704          * the number of snapshots could change by the time we read
1705          * it in, in which case we re-read it.
1706          */
1707         do {
1708                 size_t size;
1709
1710                 kfree(ondisk);
1711
1712                 size = sizeof (*ondisk);
1713                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1714                 size += names_size;
1715                 ondisk = kmalloc(size, GFP_KERNEL);
1716                 if (!ondisk)
1717                         return ERR_PTR(-ENOMEM);
1718
1719                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1720                                        rbd_dev->header_name,
1721                                        0, size,
1722                                        (char *) ondisk, version);
1723
1724                 if (ret < 0)
1725                         goto out_err;
1726                 if (WARN_ON((size_t) ret < size)) {
1727                         ret = -ENXIO;
1728                         pr_warning("short header read for image %s"
1729                                         " (want %zd got %d)\n",
1730                                 rbd_dev->image_name, size, ret);
1731                         goto out_err;
1732                 }
1733                 if (!rbd_dev_ondisk_valid(ondisk)) {
1734                         ret = -ENXIO;
1735                         pr_warning("invalid header for image %s\n",
1736                                 rbd_dev->image_name);
1737                         goto out_err;
1738                 }
1739
1740                 names_size = le64_to_cpu(ondisk->snap_names_len);
1741                 want_count = snap_count;
1742                 snap_count = le32_to_cpu(ondisk->snap_count);
1743         } while (snap_count != want_count);
1744
1745         return ondisk;
1746
1747 out_err:
1748         kfree(ondisk);
1749
1750         return ERR_PTR(ret);
1751 }
1752
1753 /*
1754  * reload the ondisk the header
1755  */
1756 static int rbd_read_header(struct rbd_device *rbd_dev,
1757                            struct rbd_image_header *header)
1758 {
1759         struct rbd_image_header_ondisk *ondisk;
1760         u64 ver = 0;
1761         int ret;
1762
1763         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1764         if (IS_ERR(ondisk))
1765                 return PTR_ERR(ondisk);
1766         ret = rbd_header_from_disk(header, ondisk);
1767         if (ret >= 0)
1768                 header->obj_version = ver;
1769         kfree(ondisk);
1770
1771         return ret;
1772 }
1773
1774 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1775 {
1776         struct rbd_snap *snap;
1777         struct rbd_snap *next;
1778
1779         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1780                 rbd_remove_snap_dev(snap);
1781 }
1782
1783 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1784 {
1785         sector_t size;
1786
1787         if (rbd_dev->snap_id != CEPH_NOSNAP)
1788                 return;
1789
1790         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1791         dout("setting size to %llu sectors", (unsigned long long) size);
1792         rbd_dev->mapping.size = (u64) size;
1793         set_capacity(rbd_dev->disk, size);
1794 }
1795
1796 /*
1797  * only read the first part of the ondisk header, without the snaps info
1798  */
1799 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1800 {
1801         int ret;
1802         struct rbd_image_header h;
1803
1804         ret = rbd_read_header(rbd_dev, &h);
1805         if (ret < 0)
1806                 return ret;
1807
1808         down_write(&rbd_dev->header_rwsem);
1809
1810         /* Update image size, and check for resize of mapped image */
1811         rbd_dev->header.image_size = h.image_size;
1812         rbd_update_mapping_size(rbd_dev);
1813
1814         /* rbd_dev->header.object_prefix shouldn't change */
1815         kfree(rbd_dev->header.snap_sizes);
1816         kfree(rbd_dev->header.snap_names);
1817         /* osd requests may still refer to snapc */
1818         ceph_put_snap_context(rbd_dev->header.snapc);
1819
1820         if (hver)
1821                 *hver = h.obj_version;
1822         rbd_dev->header.obj_version = h.obj_version;
1823         rbd_dev->header.image_size = h.image_size;
1824         rbd_dev->header.snapc = h.snapc;
1825         rbd_dev->header.snap_names = h.snap_names;
1826         rbd_dev->header.snap_sizes = h.snap_sizes;
1827         /* Free the extra copy of the object prefix */
1828         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1829         kfree(h.object_prefix);
1830
1831         ret = rbd_dev_snaps_update(rbd_dev);
1832         if (!ret)
1833                 ret = rbd_dev_snaps_register(rbd_dev);
1834
1835         up_write(&rbd_dev->header_rwsem);
1836
1837         return ret;
1838 }
1839
1840 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1841 {
1842         int ret;
1843
1844         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1845         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1846         if (rbd_dev->image_format == 1)
1847                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1848         else
1849                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1850         mutex_unlock(&ctl_mutex);
1851
1852         return ret;
1853 }
1854
1855 static int rbd_init_disk(struct rbd_device *rbd_dev)
1856 {
1857         struct gendisk *disk;
1858         struct request_queue *q;
1859         u64 segment_size;
1860
1861         /* create gendisk info */
1862         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1863         if (!disk)
1864                 return -ENOMEM;
1865
1866         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1867                  rbd_dev->dev_id);
1868         disk->major = rbd_dev->major;
1869         disk->first_minor = 0;
1870         disk->fops = &rbd_bd_ops;
1871         disk->private_data = rbd_dev;
1872
1873         /* init rq */
1874         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1875         if (!q)
1876                 goto out_disk;
1877
1878         /* We use the default size, but let's be explicit about it. */
1879         blk_queue_physical_block_size(q, SECTOR_SIZE);
1880
1881         /* set io sizes to object size */
1882         segment_size = rbd_obj_bytes(&rbd_dev->header);
1883         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1884         blk_queue_max_segment_size(q, segment_size);
1885         blk_queue_io_min(q, segment_size);
1886         blk_queue_io_opt(q, segment_size);
1887
1888         blk_queue_merge_bvec(q, rbd_merge_bvec);
1889         disk->queue = q;
1890
1891         q->queuedata = rbd_dev;
1892
1893         rbd_dev->disk = disk;
1894
1895         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1896
1897         return 0;
1898 out_disk:
1899         put_disk(disk);
1900
1901         return -ENOMEM;
1902 }
1903
1904 /*
1905   sysfs
1906 */
1907
1908 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1909 {
1910         return container_of(dev, struct rbd_device, dev);
1911 }
1912
1913 static ssize_t rbd_size_show(struct device *dev,
1914                              struct device_attribute *attr, char *buf)
1915 {
1916         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1917         sector_t size;
1918
1919         down_read(&rbd_dev->header_rwsem);
1920         size = get_capacity(rbd_dev->disk);
1921         up_read(&rbd_dev->header_rwsem);
1922
1923         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1924 }
1925
1926 /*
1927  * Note this shows the features for whatever's mapped, which is not
1928  * necessarily the base image.
1929  */
1930 static ssize_t rbd_features_show(struct device *dev,
1931                              struct device_attribute *attr, char *buf)
1932 {
1933         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934
1935         return sprintf(buf, "0x%016llx\n",
1936                         (unsigned long long) rbd_dev->mapping.features);
1937 }
1938
1939 static ssize_t rbd_major_show(struct device *dev,
1940                               struct device_attribute *attr, char *buf)
1941 {
1942         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1943
1944         return sprintf(buf, "%d\n", rbd_dev->major);
1945 }
1946
1947 static ssize_t rbd_client_id_show(struct device *dev,
1948                                   struct device_attribute *attr, char *buf)
1949 {
1950         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951
1952         return sprintf(buf, "client%lld\n",
1953                         ceph_client_id(rbd_dev->rbd_client->client));
1954 }
1955
1956 static ssize_t rbd_pool_show(struct device *dev,
1957                              struct device_attribute *attr, char *buf)
1958 {
1959         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960
1961         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1962 }
1963
1964 static ssize_t rbd_pool_id_show(struct device *dev,
1965                              struct device_attribute *attr, char *buf)
1966 {
1967         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968
1969         return sprintf(buf, "%llu\n", (unsigned long long) rbd_dev->pool_id);
1970 }
1971
1972 static ssize_t rbd_name_show(struct device *dev,
1973                              struct device_attribute *attr, char *buf)
1974 {
1975         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976
1977         return sprintf(buf, "%s\n", rbd_dev->image_name);
1978 }
1979
1980 static ssize_t rbd_image_id_show(struct device *dev,
1981                              struct device_attribute *attr, char *buf)
1982 {
1983         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1984
1985         return sprintf(buf, "%s\n", rbd_dev->image_id);
1986 }
1987
1988 /*
1989  * Shows the name of the currently-mapped snapshot (or
1990  * RBD_SNAP_HEAD_NAME for the base image).
1991  */
1992 static ssize_t rbd_snap_show(struct device *dev,
1993                              struct device_attribute *attr,
1994                              char *buf)
1995 {
1996         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1997
1998         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1999 }
2000
2001 static ssize_t rbd_image_refresh(struct device *dev,
2002                                  struct device_attribute *attr,
2003                                  const char *buf,
2004                                  size_t size)
2005 {
2006         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2007         int ret;
2008
2009         ret = rbd_dev_refresh(rbd_dev, NULL);
2010
2011         return ret < 0 ? ret : size;
2012 }
2013
2014 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2015 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2016 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2017 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2018 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2019 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2020 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2021 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2022 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2023 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2024
2025 static struct attribute *rbd_attrs[] = {
2026         &dev_attr_size.attr,
2027         &dev_attr_features.attr,
2028         &dev_attr_major.attr,
2029         &dev_attr_client_id.attr,
2030         &dev_attr_pool.attr,
2031         &dev_attr_pool_id.attr,
2032         &dev_attr_name.attr,
2033         &dev_attr_image_id.attr,
2034         &dev_attr_current_snap.attr,
2035         &dev_attr_refresh.attr,
2036         NULL
2037 };
2038
2039 static struct attribute_group rbd_attr_group = {
2040         .attrs = rbd_attrs,
2041 };
2042
2043 static const struct attribute_group *rbd_attr_groups[] = {
2044         &rbd_attr_group,
2045         NULL
2046 };
2047
2048 static void rbd_sysfs_dev_release(struct device *dev)
2049 {
2050 }
2051
2052 static struct device_type rbd_device_type = {
2053         .name           = "rbd",
2054         .groups         = rbd_attr_groups,
2055         .release        = rbd_sysfs_dev_release,
2056 };
2057
2058
2059 /*
2060   sysfs - snapshots
2061 */
2062
2063 static ssize_t rbd_snap_size_show(struct device *dev,
2064                                   struct device_attribute *attr,
2065                                   char *buf)
2066 {
2067         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2068
2069         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2070 }
2071
2072 static ssize_t rbd_snap_id_show(struct device *dev,
2073                                 struct device_attribute *attr,
2074                                 char *buf)
2075 {
2076         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2077
2078         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2079 }
2080
2081 static ssize_t rbd_snap_features_show(struct device *dev,
2082                                 struct device_attribute *attr,
2083                                 char *buf)
2084 {
2085         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2086
2087         return sprintf(buf, "0x%016llx\n",
2088                         (unsigned long long) snap->features);
2089 }
2090
2091 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2092 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2093 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2094
2095 static struct attribute *rbd_snap_attrs[] = {
2096         &dev_attr_snap_size.attr,
2097         &dev_attr_snap_id.attr,
2098         &dev_attr_snap_features.attr,
2099         NULL,
2100 };
2101
2102 static struct attribute_group rbd_snap_attr_group = {
2103         .attrs = rbd_snap_attrs,
2104 };
2105
2106 static void rbd_snap_dev_release(struct device *dev)
2107 {
2108         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2109         kfree(snap->name);
2110         kfree(snap);
2111 }
2112
2113 static const struct attribute_group *rbd_snap_attr_groups[] = {
2114         &rbd_snap_attr_group,
2115         NULL
2116 };
2117
2118 static struct device_type rbd_snap_device_type = {
2119         .groups         = rbd_snap_attr_groups,
2120         .release        = rbd_snap_dev_release,
2121 };
2122
2123 static bool rbd_snap_registered(struct rbd_snap *snap)
2124 {
2125         bool ret = snap->dev.type == &rbd_snap_device_type;
2126         bool reg = device_is_registered(&snap->dev);
2127
2128         rbd_assert(!ret ^ reg);
2129
2130         return ret;
2131 }
2132
2133 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2134 {
2135         list_del(&snap->node);
2136         if (device_is_registered(&snap->dev))
2137                 device_unregister(&snap->dev);
2138 }
2139
2140 static int rbd_register_snap_dev(struct rbd_snap *snap,
2141                                   struct device *parent)
2142 {
2143         struct device *dev = &snap->dev;
2144         int ret;
2145
2146         dev->type = &rbd_snap_device_type;
2147         dev->parent = parent;
2148         dev->release = rbd_snap_dev_release;
2149         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2150         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2151
2152         ret = device_register(dev);
2153
2154         return ret;
2155 }
2156
2157 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2158                                                 const char *snap_name,
2159                                                 u64 snap_id, u64 snap_size,
2160                                                 u64 snap_features)
2161 {
2162         struct rbd_snap *snap;
2163         int ret;
2164
2165         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2166         if (!snap)
2167                 return ERR_PTR(-ENOMEM);
2168
2169         ret = -ENOMEM;
2170         snap->name = kstrdup(snap_name, GFP_KERNEL);
2171         if (!snap->name)
2172                 goto err;
2173
2174         snap->id = snap_id;
2175         snap->size = snap_size;
2176         snap->features = snap_features;
2177
2178         return snap;
2179
2180 err:
2181         kfree(snap->name);
2182         kfree(snap);
2183
2184         return ERR_PTR(ret);
2185 }
2186
2187 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2188                 u64 *snap_size, u64 *snap_features)
2189 {
2190         char *snap_name;
2191
2192         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2193
2194         *snap_size = rbd_dev->header.snap_sizes[which];
2195         *snap_features = 0;     /* No features for v1 */
2196
2197         /* Skip over names until we find the one we are looking for */
2198
2199         snap_name = rbd_dev->header.snap_names;
2200         while (which--)
2201                 snap_name += strlen(snap_name) + 1;
2202
2203         return snap_name;
2204 }
2205
2206 /*
2207  * Get the size and object order for an image snapshot, or if
2208  * snap_id is CEPH_NOSNAP, gets this information for the base
2209  * image.
2210  */
2211 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2212                                 u8 *order, u64 *snap_size)
2213 {
2214         __le64 snapid = cpu_to_le64(snap_id);
2215         int ret;
2216         struct {
2217                 u8 order;
2218                 __le64 size;
2219         } __attribute__ ((packed)) size_buf = { 0 };
2220
2221         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2222                                 "rbd", "get_size",
2223                                 (char *) &snapid, sizeof (snapid),
2224                                 (char *) &size_buf, sizeof (size_buf),
2225                                 CEPH_OSD_FLAG_READ, NULL);
2226         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2227         if (ret < 0)
2228                 return ret;
2229
2230         *order = size_buf.order;
2231         *snap_size = le64_to_cpu(size_buf.size);
2232
2233         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2234                 (unsigned long long) snap_id, (unsigned int) *order,
2235                 (unsigned long long) *snap_size);
2236
2237         return 0;
2238 }
2239
2240 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2241 {
2242         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2243                                         &rbd_dev->header.obj_order,
2244                                         &rbd_dev->header.image_size);
2245 }
2246
2247 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2248 {
2249         void *reply_buf;
2250         int ret;
2251         void *p;
2252
2253         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2254         if (!reply_buf)
2255                 return -ENOMEM;
2256
2257         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2258                                 "rbd", "get_object_prefix",
2259                                 NULL, 0,
2260                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2261                                 CEPH_OSD_FLAG_READ, NULL);
2262         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2263         if (ret < 0)
2264                 goto out;
2265         ret = 0;    /* rbd_req_sync_exec() can return positive */
2266
2267         p = reply_buf;
2268         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2269                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2270                                                 NULL, GFP_NOIO);
2271
2272         if (IS_ERR(rbd_dev->header.object_prefix)) {
2273                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2274                 rbd_dev->header.object_prefix = NULL;
2275         } else {
2276                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2277         }
2278
2279 out:
2280         kfree(reply_buf);
2281
2282         return ret;
2283 }
2284
2285 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2286                 u64 *snap_features)
2287 {
2288         __le64 snapid = cpu_to_le64(snap_id);
2289         struct {
2290                 __le64 features;
2291                 __le64 incompat;
2292         } features_buf = { 0 };
2293         u64 incompat;
2294         int ret;
2295
2296         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2297                                 "rbd", "get_features",
2298                                 (char *) &snapid, sizeof (snapid),
2299                                 (char *) &features_buf, sizeof (features_buf),
2300                                 CEPH_OSD_FLAG_READ, NULL);
2301         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2302         if (ret < 0)
2303                 return ret;
2304
2305         incompat = le64_to_cpu(features_buf.incompat);
2306         if (incompat & ~RBD_FEATURES_ALL)
2307                 return -ENOTSUPP;
2308
2309         *snap_features = le64_to_cpu(features_buf.features);
2310
2311         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2312                 (unsigned long long) snap_id,
2313                 (unsigned long long) *snap_features,
2314                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2315
2316         return 0;
2317 }
2318
2319 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2320 {
2321         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2322                                                 &rbd_dev->header.features);
2323 }
2324
2325 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2326 {
2327         size_t size;
2328         int ret;
2329         void *reply_buf;
2330         void *p;
2331         void *end;
2332         u64 seq;
2333         u32 snap_count;
2334         struct ceph_snap_context *snapc;
2335         u32 i;
2336
2337         /*
2338          * We'll need room for the seq value (maximum snapshot id),
2339          * snapshot count, and array of that many snapshot ids.
2340          * For now we have a fixed upper limit on the number we're
2341          * prepared to receive.
2342          */
2343         size = sizeof (__le64) + sizeof (__le32) +
2344                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2345         reply_buf = kzalloc(size, GFP_KERNEL);
2346         if (!reply_buf)
2347                 return -ENOMEM;
2348
2349         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2350                                 "rbd", "get_snapcontext",
2351                                 NULL, 0,
2352                                 reply_buf, size,
2353                                 CEPH_OSD_FLAG_READ, ver);
2354         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2355         if (ret < 0)
2356                 goto out;
2357
2358         ret = -ERANGE;
2359         p = reply_buf;
2360         end = (char *) reply_buf + size;
2361         ceph_decode_64_safe(&p, end, seq, out);
2362         ceph_decode_32_safe(&p, end, snap_count, out);
2363
2364         /*
2365          * Make sure the reported number of snapshot ids wouldn't go
2366          * beyond the end of our buffer.  But before checking that,
2367          * make sure the computed size of the snapshot context we
2368          * allocate is representable in a size_t.
2369          */
2370         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2371                                  / sizeof (u64)) {
2372                 ret = -EINVAL;
2373                 goto out;
2374         }
2375         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2376                 goto out;
2377
2378         size = sizeof (struct ceph_snap_context) +
2379                                 snap_count * sizeof (snapc->snaps[0]);
2380         snapc = kmalloc(size, GFP_KERNEL);
2381         if (!snapc) {
2382                 ret = -ENOMEM;
2383                 goto out;
2384         }
2385
2386         atomic_set(&snapc->nref, 1);
2387         snapc->seq = seq;
2388         snapc->num_snaps = snap_count;
2389         for (i = 0; i < snap_count; i++)
2390                 snapc->snaps[i] = ceph_decode_64(&p);
2391
2392         rbd_dev->header.snapc = snapc;
2393
2394         dout("  snap context seq = %llu, snap_count = %u\n",
2395                 (unsigned long long) seq, (unsigned int) snap_count);
2396
2397 out:
2398         kfree(reply_buf);
2399
2400         return 0;
2401 }
2402
2403 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2404 {
2405         size_t size;
2406         void *reply_buf;
2407         __le64 snap_id;
2408         int ret;
2409         void *p;
2410         void *end;
2411         size_t snap_name_len;
2412         char *snap_name;
2413
2414         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2415         reply_buf = kmalloc(size, GFP_KERNEL);
2416         if (!reply_buf)
2417                 return ERR_PTR(-ENOMEM);
2418
2419         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2420         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2421                                 "rbd", "get_snapshot_name",
2422                                 (char *) &snap_id, sizeof (snap_id),
2423                                 reply_buf, size,
2424                                 CEPH_OSD_FLAG_READ, NULL);
2425         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2426         if (ret < 0)
2427                 goto out;
2428
2429         p = reply_buf;
2430         end = (char *) reply_buf + size;
2431         snap_name_len = 0;
2432         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2433                                 GFP_KERNEL);
2434         if (IS_ERR(snap_name)) {
2435                 ret = PTR_ERR(snap_name);
2436                 goto out;
2437         } else {
2438                 dout("  snap_id 0x%016llx snap_name = %s\n",
2439                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2440         }
2441         kfree(reply_buf);
2442
2443         return snap_name;
2444 out:
2445         kfree(reply_buf);
2446
2447         return ERR_PTR(ret);
2448 }
2449
2450 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2451                 u64 *snap_size, u64 *snap_features)
2452 {
2453         __le64 snap_id;
2454         u8 order;
2455         int ret;
2456
2457         snap_id = rbd_dev->header.snapc->snaps[which];
2458         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2459         if (ret)
2460                 return ERR_PTR(ret);
2461         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2462         if (ret)
2463                 return ERR_PTR(ret);
2464
2465         return rbd_dev_v2_snap_name(rbd_dev, which);
2466 }
2467
2468 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2469                 u64 *snap_size, u64 *snap_features)
2470 {
2471         if (rbd_dev->image_format == 1)
2472                 return rbd_dev_v1_snap_info(rbd_dev, which,
2473                                         snap_size, snap_features);
2474         if (rbd_dev->image_format == 2)
2475                 return rbd_dev_v2_snap_info(rbd_dev, which,
2476                                         snap_size, snap_features);
2477         return ERR_PTR(-EINVAL);
2478 }
2479
2480 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2481 {
2482         int ret;
2483         __u8 obj_order;
2484
2485         down_write(&rbd_dev->header_rwsem);
2486
2487         /* Grab old order first, to see if it changes */
2488
2489         obj_order = rbd_dev->header.obj_order,
2490         ret = rbd_dev_v2_image_size(rbd_dev);
2491         if (ret)
2492                 goto out;
2493         if (rbd_dev->header.obj_order != obj_order) {
2494                 ret = -EIO;
2495                 goto out;
2496         }
2497         rbd_update_mapping_size(rbd_dev);
2498
2499         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2500         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2501         if (ret)
2502                 goto out;
2503         ret = rbd_dev_snaps_update(rbd_dev);
2504         dout("rbd_dev_snaps_update returned %d\n", ret);
2505         if (ret)
2506                 goto out;
2507         ret = rbd_dev_snaps_register(rbd_dev);
2508         dout("rbd_dev_snaps_register returned %d\n", ret);
2509 out:
2510         up_write(&rbd_dev->header_rwsem);
2511
2512         return ret;
2513 }
2514
2515 /*
2516  * Scan the rbd device's current snapshot list and compare it to the
2517  * newly-received snapshot context.  Remove any existing snapshots
2518  * not present in the new snapshot context.  Add a new snapshot for
2519  * any snaphots in the snapshot context not in the current list.
2520  * And verify there are no changes to snapshots we already know
2521  * about.
2522  *
2523  * Assumes the snapshots in the snapshot context are sorted by
2524  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2525  * are also maintained in that order.)
2526  */
2527 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2528 {
2529         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2530         const u32 snap_count = snapc->num_snaps;
2531         struct list_head *head = &rbd_dev->snaps;
2532         struct list_head *links = head->next;
2533         u32 index = 0;
2534
2535         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2536         while (index < snap_count || links != head) {
2537                 u64 snap_id;
2538                 struct rbd_snap *snap;
2539                 char *snap_name;
2540                 u64 snap_size = 0;
2541                 u64 snap_features = 0;
2542
2543                 snap_id = index < snap_count ? snapc->snaps[index]
2544                                              : CEPH_NOSNAP;
2545                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2546                                      : NULL;
2547                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2548
2549                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2550                         struct list_head *next = links->next;
2551
2552                         /* Existing snapshot not in the new snap context */
2553
2554                         if (rbd_dev->snap_id == snap->id)
2555                                 rbd_dev->exists = false;
2556                         rbd_remove_snap_dev(snap);
2557                         dout("%ssnap id %llu has been removed\n",
2558                                 rbd_dev->snap_id == snap->id ?  "mapped " : "",
2559                                 (unsigned long long) snap->id);
2560
2561                         /* Done with this list entry; advance */
2562
2563                         links = next;
2564                         continue;
2565                 }
2566
2567                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2568                                         &snap_size, &snap_features);
2569                 if (IS_ERR(snap_name))
2570                         return PTR_ERR(snap_name);
2571
2572                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2573                         (unsigned long long) snap_id);
2574                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2575                         struct rbd_snap *new_snap;
2576
2577                         /* We haven't seen this snapshot before */
2578
2579                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2580                                         snap_id, snap_size, snap_features);
2581                         if (IS_ERR(new_snap)) {
2582                                 int err = PTR_ERR(new_snap);
2583
2584                                 dout("  failed to add dev, error %d\n", err);
2585
2586                                 return err;
2587                         }
2588
2589                         /* New goes before existing, or at end of list */
2590
2591                         dout("  added dev%s\n", snap ? "" : " at end\n");
2592                         if (snap)
2593                                 list_add_tail(&new_snap->node, &snap->node);
2594                         else
2595                                 list_add_tail(&new_snap->node, head);
2596                 } else {
2597                         /* Already have this one */
2598
2599                         dout("  already present\n");
2600
2601                         rbd_assert(snap->size == snap_size);
2602                         rbd_assert(!strcmp(snap->name, snap_name));
2603                         rbd_assert(snap->features == snap_features);
2604
2605                         /* Done with this list entry; advance */
2606
2607                         links = links->next;
2608                 }
2609
2610                 /* Advance to the next entry in the snapshot context */
2611
2612                 index++;
2613         }
2614         dout("%s: done\n", __func__);
2615
2616         return 0;
2617 }
2618
2619 /*
2620  * Scan the list of snapshots and register the devices for any that
2621  * have not already been registered.
2622  */
2623 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2624 {
2625         struct rbd_snap *snap;
2626         int ret = 0;
2627
2628         dout("%s called\n", __func__);
2629         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2630                 return -EIO;
2631
2632         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2633                 if (!rbd_snap_registered(snap)) {
2634                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2635                         if (ret < 0)
2636                                 break;
2637                 }
2638         }
2639         dout("%s: returning %d\n", __func__, ret);
2640
2641         return ret;
2642 }
2643
2644 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2645 {
2646         struct device *dev;
2647         int ret;
2648
2649         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2650
2651         dev = &rbd_dev->dev;
2652         dev->bus = &rbd_bus_type;
2653         dev->type = &rbd_device_type;
2654         dev->parent = &rbd_root_dev;
2655         dev->release = rbd_dev_release;
2656         dev_set_name(dev, "%d", rbd_dev->dev_id);
2657         ret = device_register(dev);
2658
2659         mutex_unlock(&ctl_mutex);
2660
2661         return ret;
2662 }
2663
2664 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2665 {
2666         device_unregister(&rbd_dev->dev);
2667 }
2668
2669 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2670 {
2671         int ret, rc;
2672
2673         do {
2674                 ret = rbd_req_sync_watch(rbd_dev);
2675                 if (ret == -ERANGE) {
2676                         rc = rbd_dev_refresh(rbd_dev, NULL);
2677                         if (rc < 0)
2678                                 return rc;
2679                 }
2680         } while (ret == -ERANGE);
2681
2682         return ret;
2683 }
2684
2685 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2686
2687 /*
2688  * Get a unique rbd identifier for the given new rbd_dev, and add
2689  * the rbd_dev to the global list.  The minimum rbd id is 1.
2690  */
2691 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2692 {
2693         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2694
2695         spin_lock(&rbd_dev_list_lock);
2696         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2697         spin_unlock(&rbd_dev_list_lock);
2698         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2699                 (unsigned long long) rbd_dev->dev_id);
2700 }
2701
2702 /*
2703  * Remove an rbd_dev from the global list, and record that its
2704  * identifier is no longer in use.
2705  */
2706 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2707 {
2708         struct list_head *tmp;
2709         int rbd_id = rbd_dev->dev_id;
2710         int max_id;
2711
2712         rbd_assert(rbd_id > 0);
2713
2714         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2715                 (unsigned long long) rbd_dev->dev_id);
2716         spin_lock(&rbd_dev_list_lock);
2717         list_del_init(&rbd_dev->node);
2718
2719         /*
2720          * If the id being "put" is not the current maximum, there
2721          * is nothing special we need to do.
2722          */
2723         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2724                 spin_unlock(&rbd_dev_list_lock);
2725                 return;
2726         }
2727
2728         /*
2729          * We need to update the current maximum id.  Search the
2730          * list to find out what it is.  We're more likely to find
2731          * the maximum at the end, so search the list backward.
2732          */
2733         max_id = 0;
2734         list_for_each_prev(tmp, &rbd_dev_list) {
2735                 struct rbd_device *rbd_dev;
2736
2737                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2738                 if (rbd_dev->dev_id > max_id)
2739                         max_id = rbd_dev->dev_id;
2740         }
2741         spin_unlock(&rbd_dev_list_lock);
2742
2743         /*
2744          * The max id could have been updated by rbd_dev_id_get(), in
2745          * which case it now accurately reflects the new maximum.
2746          * Be careful not to overwrite the maximum value in that
2747          * case.
2748          */
2749         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2750         dout("  max dev id has been reset\n");
2751 }
2752
2753 /*
2754  * Skips over white space at *buf, and updates *buf to point to the
2755  * first found non-space character (if any). Returns the length of
2756  * the token (string of non-white space characters) found.  Note
2757  * that *buf must be terminated with '\0'.
2758  */
2759 static inline size_t next_token(const char **buf)
2760 {
2761         /*
2762         * These are the characters that produce nonzero for
2763         * isspace() in the "C" and "POSIX" locales.
2764         */
2765         const char *spaces = " \f\n\r\t\v";
2766
2767         *buf += strspn(*buf, spaces);   /* Find start of token */
2768
2769         return strcspn(*buf, spaces);   /* Return token length */
2770 }
2771
2772 /*
2773  * Finds the next token in *buf, and if the provided token buffer is
2774  * big enough, copies the found token into it.  The result, if
2775  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2776  * must be terminated with '\0' on entry.
2777  *
2778  * Returns the length of the token found (not including the '\0').
2779  * Return value will be 0 if no token is found, and it will be >=
2780  * token_size if the token would not fit.
2781  *
2782  * The *buf pointer will be updated to point beyond the end of the
2783  * found token.  Note that this occurs even if the token buffer is
2784  * too small to hold it.
2785  */
2786 static inline size_t copy_token(const char **buf,
2787                                 char *token,
2788                                 size_t token_size)
2789 {
2790         size_t len;
2791
2792         len = next_token(buf);
2793         if (len < token_size) {
2794                 memcpy(token, *buf, len);
2795                 *(token + len) = '\0';
2796         }
2797         *buf += len;
2798
2799         return len;
2800 }
2801
2802 /*
2803  * Finds the next token in *buf, dynamically allocates a buffer big
2804  * enough to hold a copy of it, and copies the token into the new
2805  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2806  * that a duplicate buffer is created even for a zero-length token.
2807  *
2808  * Returns a pointer to the newly-allocated duplicate, or a null
2809  * pointer if memory for the duplicate was not available.  If
2810  * the lenp argument is a non-null pointer, the length of the token
2811  * (not including the '\0') is returned in *lenp.
2812  *
2813  * If successful, the *buf pointer will be updated to point beyond
2814  * the end of the found token.
2815  *
2816  * Note: uses GFP_KERNEL for allocation.
2817  */
2818 static inline char *dup_token(const char **buf, size_t *lenp)
2819 {
2820         char *dup;
2821         size_t len;
2822
2823         len = next_token(buf);
2824         dup = kmalloc(len + 1, GFP_KERNEL);
2825         if (!dup)
2826                 return NULL;
2827
2828         memcpy(dup, *buf, len);
2829         *(dup + len) = '\0';
2830         *buf += len;
2831
2832         if (lenp)
2833                 *lenp = len;
2834
2835         return dup;
2836 }
2837
2838 /*
2839  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2840  * rbd_md_name, and name fields of the given rbd_dev, based on the
2841  * list of monitor addresses and other options provided via
2842  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2843  * copy of the snapshot name to map if successful, or a
2844  * pointer-coded error otherwise.
2845  *
2846  * Note: rbd_dev is assumed to have been initially zero-filled.
2847  */
2848 static struct ceph_options *rbd_add_parse_args(struct rbd_device *rbd_dev,
2849                                                 const char *buf,
2850                                                 char *options,
2851                                                 size_t options_size,
2852                                                 char **snap_name,
2853                                                 size_t *snap_name_len)
2854 {
2855         size_t len;
2856         const char *mon_addrs;
2857         size_t mon_addrs_size;
2858         struct rbd_options rbd_opts;
2859         struct ceph_options *ceph_opts;
2860         struct ceph_options *err_ptr = ERR_PTR(-EINVAL);
2861
2862         /* The first four tokens are required */
2863
2864         len = next_token(&buf);
2865         if (!len)
2866                 return err_ptr;
2867         mon_addrs_size = len + 1;
2868         mon_addrs = buf;
2869
2870         buf += len;
2871
2872         len = copy_token(&buf, options, options_size);
2873         if (!len || len >= options_size)
2874                 return err_ptr;
2875
2876         err_ptr = ERR_PTR(-ENOMEM);
2877         rbd_dev->pool_name = dup_token(&buf, NULL);
2878         if (!rbd_dev->pool_name)
2879                 goto out_err;
2880
2881         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2882         if (!rbd_dev->image_name)
2883                 goto out_err;
2884
2885         /* Snapshot name is optional; default is to use "head" */
2886
2887         len = next_token(&buf);
2888         if (len > RBD_MAX_SNAP_NAME_LEN) {
2889                 err_ptr = ERR_PTR(-ENAMETOOLONG);
2890                 goto out_err;
2891         }
2892         if (!len) {
2893                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2894                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2895         }
2896         *snap_name = kmalloc(len + 1, GFP_KERNEL);
2897         if (!*snap_name)
2898                 goto out_err;
2899         memcpy(*snap_name, buf, len);
2900         *(*snap_name + len) = '\0';
2901         *snap_name_len = len;
2902         /* Initialize all rbd options to the defaults */
2903
2904         rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
2905
2906         ceph_opts = ceph_parse_options(options, mon_addrs,
2907                                         mon_addrs + mon_addrs_size - 1,
2908                                         parse_rbd_opts_token, &rbd_opts);
2909
2910         /* Record the parsed rbd options */
2911
2912         if (!IS_ERR(ceph_opts)) {
2913                 rbd_dev->mapping.read_only = rbd_opts.read_only;
2914         }
2915
2916         return ceph_opts;
2917 out_err:
2918         kfree(rbd_dev->image_name);
2919         rbd_dev->image_name = NULL;
2920         rbd_dev->image_name_len = 0;
2921         kfree(rbd_dev->pool_name);
2922         rbd_dev->pool_name = NULL;
2923
2924         return err_ptr;
2925 }
2926
2927 /*
2928  * An rbd format 2 image has a unique identifier, distinct from the
2929  * name given to it by the user.  Internally, that identifier is
2930  * what's used to specify the names of objects related to the image.
2931  *
2932  * A special "rbd id" object is used to map an rbd image name to its
2933  * id.  If that object doesn't exist, then there is no v2 rbd image
2934  * with the supplied name.
2935  *
2936  * This function will record the given rbd_dev's image_id field if
2937  * it can be determined, and in that case will return 0.  If any
2938  * errors occur a negative errno will be returned and the rbd_dev's
2939  * image_id field will be unchanged (and should be NULL).
2940  */
2941 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2942 {
2943         int ret;
2944         size_t size;
2945         char *object_name;
2946         void *response;
2947         void *p;
2948
2949         /*
2950          * First, see if the format 2 image id file exists, and if
2951          * so, get the image's persistent id from it.
2952          */
2953         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2954         object_name = kmalloc(size, GFP_NOIO);
2955         if (!object_name)
2956                 return -ENOMEM;
2957         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2958         dout("rbd id object name is %s\n", object_name);
2959
2960         /* Response will be an encoded string, which includes a length */
2961
2962         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2963         response = kzalloc(size, GFP_NOIO);
2964         if (!response) {
2965                 ret = -ENOMEM;
2966                 goto out;
2967         }
2968
2969         ret = rbd_req_sync_exec(rbd_dev, object_name,
2970                                 "rbd", "get_id",
2971                                 NULL, 0,
2972                                 response, RBD_IMAGE_ID_LEN_MAX,
2973                                 CEPH_OSD_FLAG_READ, NULL);
2974         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2975         if (ret < 0)
2976                 goto out;
2977         ret = 0;    /* rbd_req_sync_exec() can return positive */
2978
2979         p = response;
2980         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2981                                                 p + RBD_IMAGE_ID_LEN_MAX,
2982                                                 &rbd_dev->image_id_len,
2983                                                 GFP_NOIO);
2984         if (IS_ERR(rbd_dev->image_id)) {
2985                 ret = PTR_ERR(rbd_dev->image_id);
2986                 rbd_dev->image_id = NULL;
2987         } else {
2988                 dout("image_id is %s\n", rbd_dev->image_id);
2989         }
2990 out:
2991         kfree(response);
2992         kfree(object_name);
2993
2994         return ret;
2995 }
2996
2997 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2998 {
2999         int ret;
3000         size_t size;
3001
3002         /* Version 1 images have no id; empty string is used */
3003
3004         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3005         if (!rbd_dev->image_id)
3006                 return -ENOMEM;
3007         rbd_dev->image_id_len = 0;
3008
3009         /* Record the header object name for this rbd image. */
3010
3011         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3012         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3013         if (!rbd_dev->header_name) {
3014                 ret = -ENOMEM;
3015                 goto out_err;
3016         }
3017         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3018
3019         /* Populate rbd image metadata */
3020
3021         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3022         if (ret < 0)
3023                 goto out_err;
3024         rbd_dev->image_format = 1;
3025
3026         dout("discovered version 1 image, header name is %s\n",
3027                 rbd_dev->header_name);
3028
3029         return 0;
3030
3031 out_err:
3032         kfree(rbd_dev->header_name);
3033         rbd_dev->header_name = NULL;
3034         kfree(rbd_dev->image_id);
3035         rbd_dev->image_id = NULL;
3036
3037         return ret;
3038 }
3039
3040 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3041 {
3042         size_t size;
3043         int ret;
3044         u64 ver = 0;
3045
3046         /*
3047          * Image id was filled in by the caller.  Record the header
3048          * object name for this rbd image.
3049          */
3050         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3051         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3052         if (!rbd_dev->header_name)
3053                 return -ENOMEM;
3054         sprintf(rbd_dev->header_name, "%s%s",
3055                         RBD_HEADER_PREFIX, rbd_dev->image_id);
3056
3057         /* Get the size and object order for the image */
3058
3059         ret = rbd_dev_v2_image_size(rbd_dev);
3060         if (ret < 0)
3061                 goto out_err;
3062
3063         /* Get the object prefix (a.k.a. block_name) for the image */
3064
3065         ret = rbd_dev_v2_object_prefix(rbd_dev);
3066         if (ret < 0)
3067                 goto out_err;
3068
3069         /* Get the and check features for the image */
3070
3071         ret = rbd_dev_v2_features(rbd_dev);
3072         if (ret < 0)
3073                 goto out_err;
3074
3075         /* crypto and compression type aren't (yet) supported for v2 images */
3076
3077         rbd_dev->header.crypt_type = 0;
3078         rbd_dev->header.comp_type = 0;
3079
3080         /* Get the snapshot context, plus the header version */
3081
3082         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3083         if (ret)
3084                 goto out_err;
3085         rbd_dev->header.obj_version = ver;
3086
3087         rbd_dev->image_format = 2;
3088
3089         dout("discovered version 2 image, header name is %s\n",
3090                 rbd_dev->header_name);
3091
3092         return 0;
3093 out_err:
3094         kfree(rbd_dev->header_name);
3095         rbd_dev->header_name = NULL;
3096         kfree(rbd_dev->header.object_prefix);
3097         rbd_dev->header.object_prefix = NULL;
3098
3099         return ret;
3100 }
3101
3102 /*
3103  * Probe for the existence of the header object for the given rbd
3104  * device.  For format 2 images this includes determining the image
3105  * id.
3106  */
3107 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3108 {
3109         int ret;
3110
3111         /*
3112          * Get the id from the image id object.  If it's not a
3113          * format 2 image, we'll get ENOENT back, and we'll assume
3114          * it's a format 1 image.
3115          */
3116         ret = rbd_dev_image_id(rbd_dev);
3117         if (ret)
3118                 ret = rbd_dev_v1_probe(rbd_dev);
3119         else
3120                 ret = rbd_dev_v2_probe(rbd_dev);
3121         if (ret)
3122                 dout("probe failed, returning %d\n", ret);
3123
3124         return ret;
3125 }
3126
3127 static ssize_t rbd_add(struct bus_type *bus,
3128                        const char *buf,
3129                        size_t count)
3130 {
3131         char *options;
3132         struct rbd_device *rbd_dev = NULL;
3133         char *snap_name;
3134         size_t snap_name_len = 0;
3135         struct ceph_options *ceph_opts;
3136         struct ceph_osd_client *osdc;
3137         int rc = -ENOMEM;
3138
3139         if (!try_module_get(THIS_MODULE))
3140                 return -ENODEV;
3141
3142         options = kmalloc(count, GFP_KERNEL);
3143         if (!options)
3144                 goto err_out_mem;
3145         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3146         if (!rbd_dev)
3147                 goto err_out_mem;
3148
3149         /* static rbd_device initialization */
3150         spin_lock_init(&rbd_dev->lock);
3151         INIT_LIST_HEAD(&rbd_dev->node);
3152         INIT_LIST_HEAD(&rbd_dev->snaps);
3153         init_rwsem(&rbd_dev->header_rwsem);
3154
3155         /* parse add command */
3156         ceph_opts = rbd_add_parse_args(rbd_dev, buf, options, count,
3157                                 &snap_name, &snap_name_len);
3158         if (IS_ERR(ceph_opts)) {
3159                 rc = PTR_ERR(ceph_opts);
3160                 goto err_out_mem;
3161         }
3162
3163         rc = rbd_get_client(rbd_dev, ceph_opts);
3164         if (rc < 0)
3165                 goto err_out_args;
3166         ceph_opts = NULL;       /* ceph_opts now owned by rbd_dev client */
3167
3168         /* pick the pool */
3169         osdc = &rbd_dev->rbd_client->client->osdc;
3170         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3171         if (rc < 0)
3172                 goto err_out_client;
3173         rbd_dev->pool_id = (u64) rc;
3174
3175         rc = rbd_dev_probe(rbd_dev);
3176         if (rc < 0)
3177                 goto err_out_client;
3178
3179         /* no need to lock here, as rbd_dev is not registered yet */
3180         rc = rbd_dev_snaps_update(rbd_dev);
3181         if (rc)
3182                 goto err_out_probe;
3183
3184         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3185         if (rc)
3186                 goto err_out_snaps;
3187
3188         /* generate unique id: find highest unique id, add one */
3189         rbd_dev_id_get(rbd_dev);
3190
3191         /* Fill in the device name, now that we have its id. */
3192         BUILD_BUG_ON(DEV_NAME_LEN
3193                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3194         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3195
3196         /* Get our block major device number. */
3197
3198         rc = register_blkdev(0, rbd_dev->name);
3199         if (rc < 0)
3200                 goto err_out_id;
3201         rbd_dev->major = rc;
3202
3203         /* Set up the blkdev mapping. */
3204
3205         rc = rbd_init_disk(rbd_dev);
3206         if (rc)
3207                 goto err_out_blkdev;
3208
3209         rc = rbd_bus_add_dev(rbd_dev);
3210         if (rc)
3211                 goto err_out_disk;
3212
3213         /*
3214          * At this point cleanup in the event of an error is the job
3215          * of the sysfs code (initiated by rbd_bus_del_dev()).
3216          */
3217
3218         down_write(&rbd_dev->header_rwsem);
3219         rc = rbd_dev_snaps_register(rbd_dev);
3220         up_write(&rbd_dev->header_rwsem);
3221         if (rc)
3222                 goto err_out_bus;
3223
3224         rc = rbd_init_watch_dev(rbd_dev);
3225         if (rc)
3226                 goto err_out_bus;
3227
3228         /* Everything's ready.  Announce the disk to the world. */
3229
3230         add_disk(rbd_dev->disk);
3231
3232         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3233                 (unsigned long long) rbd_dev->mapping.size);
3234
3235         return count;
3236
3237 err_out_bus:
3238         /* this will also clean up rest of rbd_dev stuff */
3239
3240         rbd_bus_del_dev(rbd_dev);
3241         kfree(options);
3242         return rc;
3243
3244 err_out_disk:
3245         rbd_free_disk(rbd_dev);
3246 err_out_blkdev:
3247         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3248 err_out_id:
3249         rbd_dev_id_put(rbd_dev);
3250 err_out_snaps:
3251         rbd_remove_all_snaps(rbd_dev);
3252 err_out_probe:
3253         rbd_header_free(&rbd_dev->header);
3254 err_out_client:
3255         kfree(rbd_dev->header_name);
3256         rbd_put_client(rbd_dev);
3257         kfree(rbd_dev->image_id);
3258 err_out_args:
3259         if (ceph_opts)
3260                 ceph_destroy_options(ceph_opts);
3261         kfree(rbd_dev->snap_name);
3262         kfree(rbd_dev->image_name);
3263         kfree(rbd_dev->pool_name);
3264 err_out_mem:
3265         kfree(rbd_dev);
3266         kfree(options);
3267
3268         dout("Error adding device %s\n", buf);
3269         module_put(THIS_MODULE);
3270
3271         return (ssize_t) rc;
3272 }
3273
3274 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3275 {
3276         struct list_head *tmp;
3277         struct rbd_device *rbd_dev;
3278
3279         spin_lock(&rbd_dev_list_lock);
3280         list_for_each(tmp, &rbd_dev_list) {
3281                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3282                 if (rbd_dev->dev_id == dev_id) {
3283                         spin_unlock(&rbd_dev_list_lock);
3284                         return rbd_dev;
3285                 }
3286         }
3287         spin_unlock(&rbd_dev_list_lock);
3288         return NULL;
3289 }
3290
3291 static void rbd_dev_release(struct device *dev)
3292 {
3293         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3294
3295         if (rbd_dev->watch_request) {
3296                 struct ceph_client *client = rbd_dev->rbd_client->client;
3297
3298                 ceph_osdc_unregister_linger_request(&client->osdc,
3299                                                     rbd_dev->watch_request);
3300         }
3301         if (rbd_dev->watch_event)
3302                 rbd_req_sync_unwatch(rbd_dev);
3303
3304         rbd_put_client(rbd_dev);
3305
3306         /* clean up and free blkdev */
3307         rbd_free_disk(rbd_dev);
3308         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3309
3310         /* release allocated disk header fields */
3311         rbd_header_free(&rbd_dev->header);
3312
3313         /* done with the id, and with the rbd_dev */
3314         kfree(rbd_dev->snap_name);
3315         kfree(rbd_dev->image_id);
3316         kfree(rbd_dev->header_name);
3317         kfree(rbd_dev->pool_name);
3318         kfree(rbd_dev->image_name);
3319         rbd_dev_id_put(rbd_dev);
3320         kfree(rbd_dev);
3321
3322         /* release module ref */
3323         module_put(THIS_MODULE);
3324 }
3325
3326 static ssize_t rbd_remove(struct bus_type *bus,
3327                           const char *buf,
3328                           size_t count)
3329 {
3330         struct rbd_device *rbd_dev = NULL;
3331         int target_id, rc;
3332         unsigned long ul;
3333         int ret = count;
3334
3335         rc = strict_strtoul(buf, 10, &ul);
3336         if (rc)
3337                 return rc;
3338
3339         /* convert to int; abort if we lost anything in the conversion */
3340         target_id = (int) ul;
3341         if (target_id != ul)
3342                 return -EINVAL;
3343
3344         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3345
3346         rbd_dev = __rbd_get_dev(target_id);
3347         if (!rbd_dev) {
3348                 ret = -ENOENT;
3349                 goto done;
3350         }
3351
3352         rbd_remove_all_snaps(rbd_dev);
3353         rbd_bus_del_dev(rbd_dev);
3354
3355 done:
3356         mutex_unlock(&ctl_mutex);
3357
3358         return ret;
3359 }
3360
3361 /*
3362  * create control files in sysfs
3363  * /sys/bus/rbd/...
3364  */
3365 static int rbd_sysfs_init(void)
3366 {
3367         int ret;
3368
3369         ret = device_register(&rbd_root_dev);
3370         if (ret < 0)
3371                 return ret;
3372
3373         ret = bus_register(&rbd_bus_type);
3374         if (ret < 0)
3375                 device_unregister(&rbd_root_dev);
3376
3377         return ret;
3378 }
3379
3380 static void rbd_sysfs_cleanup(void)
3381 {
3382         bus_unregister(&rbd_bus_type);
3383         device_unregister(&rbd_root_dev);
3384 }
3385
3386 int __init rbd_init(void)
3387 {
3388         int rc;
3389
3390         rc = rbd_sysfs_init();
3391         if (rc)
3392                 return rc;
3393         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3394         return 0;
3395 }
3396
3397 void __exit rbd_exit(void)
3398 {
3399         rbd_sysfs_cleanup();
3400 }
3401
3402 module_init(rbd_init);
3403 module_exit(rbd_exit);
3404
3405 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3406 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3407 MODULE_DESCRIPTION("rados block device");
3408
3409 /* following authorship retained from original osdblk.c */
3410 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3411
3412 MODULE_LICENSE("GPL");