]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
839ab730a1f3dafddd93bdbe1d14e708902f7c0d
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         u32 total_snaps;
85
86         char *snap_names;
87         u64 *snap_sizes;
88
89         u64 obj_version;
90 };
91
92 struct rbd_options {
93         int     notify_timeout;
94 };
95
96 /*
97  * an instance of the client.  multiple devices may share an rbd client.
98  */
99 struct rbd_client {
100         struct ceph_client      *client;
101         struct kref             kref;
102         struct list_head        node;
103 };
104
105 /*
106  * a request completion status
107  */
108 struct rbd_req_status {
109         int done;
110         int rc;
111         u64 bytes;
112 };
113
114 /*
115  * a collection of requests
116  */
117 struct rbd_req_coll {
118         int                     total;
119         int                     num_done;
120         struct kref             kref;
121         struct rbd_req_status   status[0];
122 };
123
124 /*
125  * a single io request
126  */
127 struct rbd_request {
128         struct request          *rq;            /* blk layer request */
129         struct bio              *bio;           /* cloned bio */
130         struct page             **pages;        /* list of used pages */
131         u64                     len;
132         int                     coll_index;
133         struct rbd_req_coll     *coll;
134 };
135
136 struct rbd_snap {
137         struct  device          dev;
138         const char              *name;
139         u64                     size;
140         struct list_head        node;
141         u64                     id;
142 };
143
144 /*
145  * a single device
146  */
147 struct rbd_device {
148         int                     dev_id;         /* blkdev unique id */
149
150         int                     major;          /* blkdev assigned major */
151         struct gendisk          *disk;          /* blkdev's gendisk and rq */
152         struct request_queue    *q;
153
154         struct rbd_options      rbd_opts;
155         struct rbd_client       *rbd_client;
156
157         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
158
159         spinlock_t              lock;           /* queue lock */
160
161         struct rbd_image_header header;
162         char                    *image_name;
163         size_t                  image_name_len;
164         char                    *header_name;
165         char                    *pool_name;
166         int                     pool_id;
167
168         struct ceph_osd_event   *watch_event;
169         struct ceph_osd_request *watch_request;
170
171         /* protects updating the header */
172         struct rw_semaphore     header_rwsem;
173         /* name of the snapshot this device reads from */
174         char                    *snap_name;
175         /* id of the snapshot this device reads from */
176         u64                     snap_id;        /* current snapshot id */
177         /* whether the snap_id this device reads from still exists */
178         bool                    snap_exists;
179         int                     read_only;
180
181         struct list_head        node;
182
183         /* list of snapshots */
184         struct list_head        snaps;
185
186         /* sysfs related */
187         struct device           dev;
188 };
189
190 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
191
192 static LIST_HEAD(rbd_dev_list);    /* devices */
193 static DEFINE_SPINLOCK(rbd_dev_list_lock);
194
195 static LIST_HEAD(rbd_client_list);              /* clients */
196 static DEFINE_SPINLOCK(rbd_client_list_lock);
197
198 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199 static void rbd_dev_release(struct device *dev);
200 static ssize_t rbd_snap_add(struct device *dev,
201                             struct device_attribute *attr,
202                             const char *buf,
203                             size_t count);
204 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
205
206 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207                        size_t count);
208 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
209                           size_t count);
210
211 static struct bus_attribute rbd_bus_attrs[] = {
212         __ATTR(add, S_IWUSR, NULL, rbd_add),
213         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
214         __ATTR_NULL
215 };
216
217 static struct bus_type rbd_bus_type = {
218         .name           = "rbd",
219         .bus_attrs      = rbd_bus_attrs,
220 };
221
222 static void rbd_root_dev_release(struct device *dev)
223 {
224 }
225
226 static struct device rbd_root_dev = {
227         .init_name =    "rbd",
228         .release =      rbd_root_dev_release,
229 };
230
231
232 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233 {
234         return get_device(&rbd_dev->dev);
235 }
236
237 static void rbd_put_dev(struct rbd_device *rbd_dev)
238 {
239         put_device(&rbd_dev->dev);
240 }
241
242 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
243
244 static int rbd_open(struct block_device *bdev, fmode_t mode)
245 {
246         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
247
248         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
249                 return -EROFS;
250
251         rbd_get_dev(rbd_dev);
252         set_device_ro(bdev, rbd_dev->read_only);
253
254         return 0;
255 }
256
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
258 {
259         struct rbd_device *rbd_dev = disk->private_data;
260
261         rbd_put_dev(rbd_dev);
262
263         return 0;
264 }
265
266 static const struct block_device_operations rbd_bd_ops = {
267         .owner                  = THIS_MODULE,
268         .open                   = rbd_open,
269         .release                = rbd_release,
270 };
271
272 /*
273  * Initialize an rbd client instance.
274  * We own *ceph_opts.
275  */
276 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
277 {
278         struct rbd_client *rbdc;
279         int ret = -ENOMEM;
280
281         dout("rbd_client_create\n");
282         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
283         if (!rbdc)
284                 goto out_opt;
285
286         kref_init(&rbdc->kref);
287         INIT_LIST_HEAD(&rbdc->node);
288
289         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
290
291         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
292         if (IS_ERR(rbdc->client))
293                 goto out_mutex;
294         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
295
296         ret = ceph_open_session(rbdc->client);
297         if (ret < 0)
298                 goto out_err;
299
300         spin_lock(&rbd_client_list_lock);
301         list_add_tail(&rbdc->node, &rbd_client_list);
302         spin_unlock(&rbd_client_list_lock);
303
304         mutex_unlock(&ctl_mutex);
305
306         dout("rbd_client_create created %p\n", rbdc);
307         return rbdc;
308
309 out_err:
310         ceph_destroy_client(rbdc->client);
311 out_mutex:
312         mutex_unlock(&ctl_mutex);
313         kfree(rbdc);
314 out_opt:
315         if (ceph_opts)
316                 ceph_destroy_options(ceph_opts);
317         return ERR_PTR(ret);
318 }
319
320 /*
321  * Find a ceph client with specific addr and configuration.  If
322  * found, bump its reference count.
323  */
324 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
325 {
326         struct rbd_client *client_node;
327         bool found = false;
328
329         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
330                 return NULL;
331
332         spin_lock(&rbd_client_list_lock);
333         list_for_each_entry(client_node, &rbd_client_list, node) {
334                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
335                         kref_get(&client_node->kref);
336                         found = true;
337                         break;
338                 }
339         }
340         spin_unlock(&rbd_client_list_lock);
341
342         return found ? client_node : NULL;
343 }
344
345 /*
346  * mount options
347  */
348 enum {
349         Opt_notify_timeout,
350         Opt_last_int,
351         /* int args above */
352         Opt_last_string,
353         /* string args above */
354 };
355
356 static match_table_t rbd_opts_tokens = {
357         {Opt_notify_timeout, "notify_timeout=%d"},
358         /* int args above */
359         /* string args above */
360         {-1, NULL}
361 };
362
363 static int parse_rbd_opts_token(char *c, void *private)
364 {
365         struct rbd_options *rbd_opts = private;
366         substring_t argstr[MAX_OPT_ARGS];
367         int token, intval, ret;
368
369         token = match_token(c, rbd_opts_tokens, argstr);
370         if (token < 0)
371                 return -EINVAL;
372
373         if (token < Opt_last_int) {
374                 ret = match_int(&argstr[0], &intval);
375                 if (ret < 0) {
376                         pr_err("bad mount option arg (not int) "
377                                "at '%s'\n", c);
378                         return ret;
379                 }
380                 dout("got int token %d val %d\n", token, intval);
381         } else if (token > Opt_last_int && token < Opt_last_string) {
382                 dout("got string token %d val %s\n", token,
383                      argstr[0].from);
384         } else {
385                 dout("got token %d\n", token);
386         }
387
388         switch (token) {
389         case Opt_notify_timeout:
390                 rbd_opts->notify_timeout = intval;
391                 break;
392         default:
393                 BUG_ON(token);
394         }
395         return 0;
396 }
397
398 /*
399  * Get a ceph client with specific addr and configuration, if one does
400  * not exist create it.
401  */
402 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
403                                 size_t mon_addr_len, char *options)
404 {
405         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
406         struct ceph_options *ceph_opts;
407         struct rbd_client *rbdc;
408
409         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410
411         ceph_opts = ceph_parse_options(options, mon_addr,
412                                         mon_addr + mon_addr_len,
413                                         parse_rbd_opts_token, rbd_opts);
414         if (IS_ERR(ceph_opts))
415                 return PTR_ERR(ceph_opts);
416
417         rbdc = rbd_client_find(ceph_opts);
418         if (rbdc) {
419                 /* using an existing client */
420                 ceph_destroy_options(ceph_opts);
421         } else {
422                 rbdc = rbd_client_create(ceph_opts);
423                 if (IS_ERR(rbdc))
424                         return PTR_ERR(rbdc);
425         }
426         rbd_dev->rbd_client = rbdc;
427
428         return 0;
429 }
430
431 /*
432  * Destroy ceph client
433  *
434  * Caller must hold rbd_client_list_lock.
435  */
436 static void rbd_client_release(struct kref *kref)
437 {
438         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
439
440         dout("rbd_release_client %p\n", rbdc);
441         spin_lock(&rbd_client_list_lock);
442         list_del(&rbdc->node);
443         spin_unlock(&rbd_client_list_lock);
444
445         ceph_destroy_client(rbdc->client);
446         kfree(rbdc);
447 }
448
449 /*
450  * Drop reference to ceph client node. If it's not referenced anymore, release
451  * it.
452  */
453 static void rbd_put_client(struct rbd_device *rbd_dev)
454 {
455         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
456         rbd_dev->rbd_client = NULL;
457 }
458
459 /*
460  * Destroy requests collection
461  */
462 static void rbd_coll_release(struct kref *kref)
463 {
464         struct rbd_req_coll *coll =
465                 container_of(kref, struct rbd_req_coll, kref);
466
467         dout("rbd_coll_release %p\n", coll);
468         kfree(coll);
469 }
470
471 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
472 {
473         size_t size;
474         u32 snap_count;
475
476         /* The header has to start with the magic rbd header text */
477         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
478                 return false;
479
480         /*
481          * The size of a snapshot header has to fit in a size_t, and
482          * that limits the number of snapshots.
483          */
484         snap_count = le32_to_cpu(ondisk->snap_count);
485         size = SIZE_MAX - sizeof (struct ceph_snap_context);
486         if (snap_count > size / sizeof (__le64))
487                 return false;
488
489         /*
490          * Not only that, but the size of the entire the snapshot
491          * header must also be representable in a size_t.
492          */
493         size -= snap_count * sizeof (__le64);
494         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
495                 return false;
496
497         return true;
498 }
499
500 /*
501  * Create a new header structure, translate header format from the on-disk
502  * header.
503  */
504 static int rbd_header_from_disk(struct rbd_image_header *header,
505                                  struct rbd_image_header_ondisk *ondisk)
506 {
507         u32 snap_count;
508         size_t len;
509         size_t size;
510         u32 i;
511
512         memset(header, 0, sizeof (*header));
513
514         snap_count = le32_to_cpu(ondisk->snap_count);
515
516         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
517         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
518         if (!header->object_prefix)
519                 return -ENOMEM;
520         memcpy(header->object_prefix, ondisk->object_prefix, len);
521         header->object_prefix[len] = '\0';
522
523         if (snap_count) {
524                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
525
526                 /* Save a copy of the snapshot names */
527
528                 if (snap_names_len > (u64) SIZE_MAX)
529                         return -EIO;
530                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
531                 if (!header->snap_names)
532                         goto out_err;
533                 /*
534                  * Note that rbd_dev_v1_header_read() guarantees
535                  * the ondisk buffer we're working with has
536                  * snap_names_len bytes beyond the end of the
537                  * snapshot id array, this memcpy() is safe.
538                  */
539                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
540                         snap_names_len);
541
542                 /* Record each snapshot's size */
543
544                 size = snap_count * sizeof (*header->snap_sizes);
545                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
546                 if (!header->snap_sizes)
547                         goto out_err;
548                 for (i = 0; i < snap_count; i++)
549                         header->snap_sizes[i] =
550                                 le64_to_cpu(ondisk->snaps[i].image_size);
551         } else {
552                 WARN_ON(ondisk->snap_names_len);
553                 header->snap_names = NULL;
554                 header->snap_sizes = NULL;
555         }
556
557         header->image_size = le64_to_cpu(ondisk->image_size);
558         header->obj_order = ondisk->options.order;
559         header->crypt_type = ondisk->options.crypt_type;
560         header->comp_type = ondisk->options.comp_type;
561         header->total_snaps = snap_count;
562
563         /* Allocate and fill in the snapshot context */
564
565         size = sizeof (struct ceph_snap_context);
566         size += snap_count * sizeof (header->snapc->snaps[0]);
567         header->snapc = kzalloc(size, GFP_KERNEL);
568         if (!header->snapc)
569                 goto out_err;
570
571         atomic_set(&header->snapc->nref, 1);
572         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
573         header->snapc->num_snaps = snap_count;
574         for (i = 0; i < snap_count; i++)
575                 header->snapc->snaps[i] =
576                         le64_to_cpu(ondisk->snaps[i].id);
577
578         return 0;
579
580 out_err:
581         kfree(header->snap_sizes);
582         header->snap_sizes = NULL;
583         kfree(header->snap_names);
584         header->snap_names = NULL;
585         kfree(header->object_prefix);
586         header->object_prefix = NULL;
587
588         return -ENOMEM;
589 }
590
591 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
592                         u64 *seq, u64 *size)
593 {
594         int i;
595         char *p = header->snap_names;
596
597         for (i = 0; i < header->total_snaps; i++) {
598                 if (!strcmp(snap_name, p)) {
599
600                         /* Found it.  Pass back its id and/or size */
601
602                         if (seq)
603                                 *seq = header->snapc->snaps[i];
604                         if (size)
605                                 *size = header->snap_sizes[i];
606                         return i;
607                 }
608                 p += strlen(p) + 1;     /* Skip ahead to the next name */
609         }
610         return -ENOENT;
611 }
612
613 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
614 {
615         int ret;
616
617         down_write(&rbd_dev->header_rwsem);
618
619         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
620                     sizeof (RBD_SNAP_HEAD_NAME))) {
621                 rbd_dev->snap_id = CEPH_NOSNAP;
622                 rbd_dev->snap_exists = false;
623                 rbd_dev->read_only = 0;
624                 if (size)
625                         *size = rbd_dev->header.image_size;
626         } else {
627                 u64 snap_id = 0;
628
629                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
630                                         &snap_id, size);
631                 if (ret < 0)
632                         goto done;
633                 rbd_dev->snap_id = snap_id;
634                 rbd_dev->snap_exists = true;
635                 rbd_dev->read_only = 1;
636         }
637
638         ret = 0;
639 done:
640         up_write(&rbd_dev->header_rwsem);
641         return ret;
642 }
643
644 static void rbd_header_free(struct rbd_image_header *header)
645 {
646         kfree(header->object_prefix);
647         header->object_prefix = NULL;
648         kfree(header->snap_sizes);
649         header->snap_sizes = NULL;
650         kfree(header->snap_names);
651         header->snap_names = NULL;
652         ceph_put_snap_context(header->snapc);
653         header->snapc = NULL;
654 }
655
656 /*
657  * get the actual striped segment name, offset and length
658  */
659 static u64 rbd_get_segment(struct rbd_image_header *header,
660                            const char *object_prefix,
661                            u64 ofs, u64 len,
662                            char *seg_name, u64 *segofs)
663 {
664         u64 seg = ofs >> header->obj_order;
665
666         if (seg_name)
667                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
668                          "%s.%012llx", object_prefix, seg);
669
670         ofs = ofs & ((1 << header->obj_order) - 1);
671         len = min_t(u64, len, (1 << header->obj_order) - ofs);
672
673         if (segofs)
674                 *segofs = ofs;
675
676         return len;
677 }
678
679 static int rbd_get_num_segments(struct rbd_image_header *header,
680                                 u64 ofs, u64 len)
681 {
682         u64 start_seg = ofs >> header->obj_order;
683         u64 end_seg = (ofs + len - 1) >> header->obj_order;
684         return end_seg - start_seg + 1;
685 }
686
687 /*
688  * returns the size of an object in the image
689  */
690 static u64 rbd_obj_bytes(struct rbd_image_header *header)
691 {
692         return 1 << header->obj_order;
693 }
694
695 /*
696  * bio helpers
697  */
698
699 static void bio_chain_put(struct bio *chain)
700 {
701         struct bio *tmp;
702
703         while (chain) {
704                 tmp = chain;
705                 chain = chain->bi_next;
706                 bio_put(tmp);
707         }
708 }
709
710 /*
711  * zeros a bio chain, starting at specific offset
712  */
713 static void zero_bio_chain(struct bio *chain, int start_ofs)
714 {
715         struct bio_vec *bv;
716         unsigned long flags;
717         void *buf;
718         int i;
719         int pos = 0;
720
721         while (chain) {
722                 bio_for_each_segment(bv, chain, i) {
723                         if (pos + bv->bv_len > start_ofs) {
724                                 int remainder = max(start_ofs - pos, 0);
725                                 buf = bvec_kmap_irq(bv, &flags);
726                                 memset(buf + remainder, 0,
727                                        bv->bv_len - remainder);
728                                 bvec_kunmap_irq(buf, &flags);
729                         }
730                         pos += bv->bv_len;
731                 }
732
733                 chain = chain->bi_next;
734         }
735 }
736
737 /*
738  * bio_chain_clone - clone a chain of bios up to a certain length.
739  * might return a bio_pair that will need to be released.
740  */
741 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
742                                    struct bio_pair **bp,
743                                    int len, gfp_t gfpmask)
744 {
745         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
746         int total = 0;
747
748         if (*bp) {
749                 bio_pair_release(*bp);
750                 *bp = NULL;
751         }
752
753         while (old_chain && (total < len)) {
754                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
755                 if (!tmp)
756                         goto err_out;
757
758                 if (total + old_chain->bi_size > len) {
759                         struct bio_pair *bp;
760
761                         /*
762                          * this split can only happen with a single paged bio,
763                          * split_bio will BUG_ON if this is not the case
764                          */
765                         dout("bio_chain_clone split! total=%d remaining=%d"
766                              "bi_size=%u\n",
767                              total, len - total, old_chain->bi_size);
768
769                         /* split the bio. We'll release it either in the next
770                            call, or it will have to be released outside */
771                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
772                         if (!bp)
773                                 goto err_out;
774
775                         __bio_clone(tmp, &bp->bio1);
776
777                         *next = &bp->bio2;
778                 } else {
779                         __bio_clone(tmp, old_chain);
780                         *next = old_chain->bi_next;
781                 }
782
783                 tmp->bi_bdev = NULL;
784                 gfpmask &= ~__GFP_WAIT;
785                 tmp->bi_next = NULL;
786
787                 if (!new_chain) {
788                         new_chain = tail = tmp;
789                 } else {
790                         tail->bi_next = tmp;
791                         tail = tmp;
792                 }
793                 old_chain = old_chain->bi_next;
794
795                 total += tmp->bi_size;
796         }
797
798         BUG_ON(total < len);
799
800         if (tail)
801                 tail->bi_next = NULL;
802
803         *old = old_chain;
804
805         return new_chain;
806
807 err_out:
808         dout("bio_chain_clone with err\n");
809         bio_chain_put(new_chain);
810         return NULL;
811 }
812
813 /*
814  * helpers for osd request op vectors.
815  */
816 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
817                                         int opcode, u32 payload_len)
818 {
819         struct ceph_osd_req_op *ops;
820
821         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
822         if (!ops)
823                 return NULL;
824
825         ops[0].op = opcode;
826
827         /*
828          * op extent offset and length will be set later on
829          * in calc_raw_layout()
830          */
831         ops[0].payload_len = payload_len;
832
833         return ops;
834 }
835
836 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
837 {
838         kfree(ops);
839 }
840
841 static void rbd_coll_end_req_index(struct request *rq,
842                                    struct rbd_req_coll *coll,
843                                    int index,
844                                    int ret, u64 len)
845 {
846         struct request_queue *q;
847         int min, max, i;
848
849         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
850              coll, index, ret, (unsigned long long) len);
851
852         if (!rq)
853                 return;
854
855         if (!coll) {
856                 blk_end_request(rq, ret, len);
857                 return;
858         }
859
860         q = rq->q;
861
862         spin_lock_irq(q->queue_lock);
863         coll->status[index].done = 1;
864         coll->status[index].rc = ret;
865         coll->status[index].bytes = len;
866         max = min = coll->num_done;
867         while (max < coll->total && coll->status[max].done)
868                 max++;
869
870         for (i = min; i<max; i++) {
871                 __blk_end_request(rq, coll->status[i].rc,
872                                   coll->status[i].bytes);
873                 coll->num_done++;
874                 kref_put(&coll->kref, rbd_coll_release);
875         }
876         spin_unlock_irq(q->queue_lock);
877 }
878
879 static void rbd_coll_end_req(struct rbd_request *req,
880                              int ret, u64 len)
881 {
882         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
883 }
884
885 /*
886  * Send ceph osd request
887  */
888 static int rbd_do_request(struct request *rq,
889                           struct rbd_device *rbd_dev,
890                           struct ceph_snap_context *snapc,
891                           u64 snapid,
892                           const char *object_name, u64 ofs, u64 len,
893                           struct bio *bio,
894                           struct page **pages,
895                           int num_pages,
896                           int flags,
897                           struct ceph_osd_req_op *ops,
898                           struct rbd_req_coll *coll,
899                           int coll_index,
900                           void (*rbd_cb)(struct ceph_osd_request *req,
901                                          struct ceph_msg *msg),
902                           struct ceph_osd_request **linger_req,
903                           u64 *ver)
904 {
905         struct ceph_osd_request *req;
906         struct ceph_file_layout *layout;
907         int ret;
908         u64 bno;
909         struct timespec mtime = CURRENT_TIME;
910         struct rbd_request *req_data;
911         struct ceph_osd_request_head *reqhead;
912         struct ceph_osd_client *osdc;
913
914         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
915         if (!req_data) {
916                 if (coll)
917                         rbd_coll_end_req_index(rq, coll, coll_index,
918                                                -ENOMEM, len);
919                 return -ENOMEM;
920         }
921
922         if (coll) {
923                 req_data->coll = coll;
924                 req_data->coll_index = coll_index;
925         }
926
927         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
928                 (unsigned long long) ofs, (unsigned long long) len);
929
930         osdc = &rbd_dev->rbd_client->client->osdc;
931         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
932                                         false, GFP_NOIO, pages, bio);
933         if (!req) {
934                 ret = -ENOMEM;
935                 goto done_pages;
936         }
937
938         req->r_callback = rbd_cb;
939
940         req_data->rq = rq;
941         req_data->bio = bio;
942         req_data->pages = pages;
943         req_data->len = len;
944
945         req->r_priv = req_data;
946
947         reqhead = req->r_request->front.iov_base;
948         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
949
950         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
951         req->r_oid_len = strlen(req->r_oid);
952
953         layout = &req->r_file_layout;
954         memset(layout, 0, sizeof(*layout));
955         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
956         layout->fl_stripe_count = cpu_to_le32(1);
957         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
958         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
959         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
960                                 req, ops);
961
962         ceph_osdc_build_request(req, ofs, &len,
963                                 ops,
964                                 snapc,
965                                 &mtime,
966                                 req->r_oid, req->r_oid_len);
967
968         if (linger_req) {
969                 ceph_osdc_set_request_linger(osdc, req);
970                 *linger_req = req;
971         }
972
973         ret = ceph_osdc_start_request(osdc, req, false);
974         if (ret < 0)
975                 goto done_err;
976
977         if (!rbd_cb) {
978                 ret = ceph_osdc_wait_request(osdc, req);
979                 if (ver)
980                         *ver = le64_to_cpu(req->r_reassert_version.version);
981                 dout("reassert_ver=%llu\n",
982                         (unsigned long long)
983                                 le64_to_cpu(req->r_reassert_version.version));
984                 ceph_osdc_put_request(req);
985         }
986         return ret;
987
988 done_err:
989         bio_chain_put(req_data->bio);
990         ceph_osdc_put_request(req);
991 done_pages:
992         rbd_coll_end_req(req_data, ret, len);
993         kfree(req_data);
994         return ret;
995 }
996
997 /*
998  * Ceph osd op callback
999  */
1000 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1001 {
1002         struct rbd_request *req_data = req->r_priv;
1003         struct ceph_osd_reply_head *replyhead;
1004         struct ceph_osd_op *op;
1005         __s32 rc;
1006         u64 bytes;
1007         int read_op;
1008
1009         /* parse reply */
1010         replyhead = msg->front.iov_base;
1011         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1012         op = (void *)(replyhead + 1);
1013         rc = le32_to_cpu(replyhead->result);
1014         bytes = le64_to_cpu(op->extent.length);
1015         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1016
1017         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1018                 (unsigned long long) bytes, read_op, (int) rc);
1019
1020         if (rc == -ENOENT && read_op) {
1021                 zero_bio_chain(req_data->bio, 0);
1022                 rc = 0;
1023         } else if (rc == 0 && read_op && bytes < req_data->len) {
1024                 zero_bio_chain(req_data->bio, bytes);
1025                 bytes = req_data->len;
1026         }
1027
1028         rbd_coll_end_req(req_data, rc, bytes);
1029
1030         if (req_data->bio)
1031                 bio_chain_put(req_data->bio);
1032
1033         ceph_osdc_put_request(req);
1034         kfree(req_data);
1035 }
1036
1037 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1038 {
1039         ceph_osdc_put_request(req);
1040 }
1041
1042 /*
1043  * Do a synchronous ceph osd operation
1044  */
1045 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1046                            struct ceph_snap_context *snapc,
1047                            u64 snapid,
1048                            int flags,
1049                            struct ceph_osd_req_op *ops,
1050                            const char *object_name,
1051                            u64 ofs, u64 len,
1052                            char *buf,
1053                            struct ceph_osd_request **linger_req,
1054                            u64 *ver)
1055 {
1056         int ret;
1057         struct page **pages;
1058         int num_pages;
1059
1060         BUG_ON(ops == NULL);
1061
1062         num_pages = calc_pages_for(ofs , len);
1063         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1064         if (IS_ERR(pages))
1065                 return PTR_ERR(pages);
1066
1067         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1068                           object_name, ofs, len, NULL,
1069                           pages, num_pages,
1070                           flags,
1071                           ops,
1072                           NULL, 0,
1073                           NULL,
1074                           linger_req, ver);
1075         if (ret < 0)
1076                 goto done;
1077
1078         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1079                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1080
1081 done:
1082         ceph_release_page_vector(pages, num_pages);
1083         return ret;
1084 }
1085
1086 /*
1087  * Do an asynchronous ceph osd operation
1088  */
1089 static int rbd_do_op(struct request *rq,
1090                      struct rbd_device *rbd_dev,
1091                      struct ceph_snap_context *snapc,
1092                      u64 snapid,
1093                      int opcode, int flags,
1094                      u64 ofs, u64 len,
1095                      struct bio *bio,
1096                      struct rbd_req_coll *coll,
1097                      int coll_index)
1098 {
1099         char *seg_name;
1100         u64 seg_ofs;
1101         u64 seg_len;
1102         int ret;
1103         struct ceph_osd_req_op *ops;
1104         u32 payload_len;
1105
1106         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1107         if (!seg_name)
1108                 return -ENOMEM;
1109
1110         seg_len = rbd_get_segment(&rbd_dev->header,
1111                                   rbd_dev->header.object_prefix,
1112                                   ofs, len,
1113                                   seg_name, &seg_ofs);
1114
1115         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1116
1117         ret = -ENOMEM;
1118         ops = rbd_create_rw_ops(1, opcode, payload_len);
1119         if (!ops)
1120                 goto done;
1121
1122         /* we've taken care of segment sizes earlier when we
1123            cloned the bios. We should never have a segment
1124            truncated at this point */
1125         BUG_ON(seg_len < len);
1126
1127         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1128                              seg_name, seg_ofs, seg_len,
1129                              bio,
1130                              NULL, 0,
1131                              flags,
1132                              ops,
1133                              coll, coll_index,
1134                              rbd_req_cb, 0, NULL);
1135
1136         rbd_destroy_ops(ops);
1137 done:
1138         kfree(seg_name);
1139         return ret;
1140 }
1141
1142 /*
1143  * Request async osd write
1144  */
1145 static int rbd_req_write(struct request *rq,
1146                          struct rbd_device *rbd_dev,
1147                          struct ceph_snap_context *snapc,
1148                          u64 ofs, u64 len,
1149                          struct bio *bio,
1150                          struct rbd_req_coll *coll,
1151                          int coll_index)
1152 {
1153         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1154                          CEPH_OSD_OP_WRITE,
1155                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1156                          ofs, len, bio, coll, coll_index);
1157 }
1158
1159 /*
1160  * Request async osd read
1161  */
1162 static int rbd_req_read(struct request *rq,
1163                          struct rbd_device *rbd_dev,
1164                          u64 snapid,
1165                          u64 ofs, u64 len,
1166                          struct bio *bio,
1167                          struct rbd_req_coll *coll,
1168                          int coll_index)
1169 {
1170         return rbd_do_op(rq, rbd_dev, NULL,
1171                          snapid,
1172                          CEPH_OSD_OP_READ,
1173                          CEPH_OSD_FLAG_READ,
1174                          ofs, len, bio, coll, coll_index);
1175 }
1176
1177 /*
1178  * Request sync osd read
1179  */
1180 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1181                           u64 snapid,
1182                           const char *object_name,
1183                           u64 ofs, u64 len,
1184                           char *buf,
1185                           u64 *ver)
1186 {
1187         struct ceph_osd_req_op *ops;
1188         int ret;
1189
1190         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1191         if (!ops)
1192                 return -ENOMEM;
1193
1194         ret = rbd_req_sync_op(rbd_dev, NULL,
1195                                snapid,
1196                                CEPH_OSD_FLAG_READ,
1197                                ops, object_name, ofs, len, buf, NULL, ver);
1198         rbd_destroy_ops(ops);
1199
1200         return ret;
1201 }
1202
1203 /*
1204  * Request sync osd watch
1205  */
1206 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1207                                    u64 ver,
1208                                    u64 notify_id)
1209 {
1210         struct ceph_osd_req_op *ops;
1211         int ret;
1212
1213         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1214         if (!ops)
1215                 return -ENOMEM;
1216
1217         ops[0].watch.ver = cpu_to_le64(ver);
1218         ops[0].watch.cookie = notify_id;
1219         ops[0].watch.flag = 0;
1220
1221         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1222                           rbd_dev->header_name, 0, 0, NULL,
1223                           NULL, 0,
1224                           CEPH_OSD_FLAG_READ,
1225                           ops,
1226                           NULL, 0,
1227                           rbd_simple_req_cb, 0, NULL);
1228
1229         rbd_destroy_ops(ops);
1230         return ret;
1231 }
1232
1233 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1234 {
1235         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1236         u64 hver;
1237         int rc;
1238
1239         if (!rbd_dev)
1240                 return;
1241
1242         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1243                 rbd_dev->header_name, (unsigned long long) notify_id,
1244                 (unsigned int) opcode);
1245         rc = rbd_refresh_header(rbd_dev, &hver);
1246         if (rc)
1247                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1248                            " update snaps: %d\n", rbd_dev->major, rc);
1249
1250         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1251 }
1252
1253 /*
1254  * Request sync osd watch
1255  */
1256 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1257 {
1258         struct ceph_osd_req_op *ops;
1259         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1260         int ret;
1261
1262         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1263         if (!ops)
1264                 return -ENOMEM;
1265
1266         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1267                                      (void *)rbd_dev, &rbd_dev->watch_event);
1268         if (ret < 0)
1269                 goto fail;
1270
1271         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1272         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1273         ops[0].watch.flag = 1;
1274
1275         ret = rbd_req_sync_op(rbd_dev, NULL,
1276                               CEPH_NOSNAP,
1277                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1278                               ops,
1279                               rbd_dev->header_name,
1280                               0, 0, NULL,
1281                               &rbd_dev->watch_request, NULL);
1282
1283         if (ret < 0)
1284                 goto fail_event;
1285
1286         rbd_destroy_ops(ops);
1287         return 0;
1288
1289 fail_event:
1290         ceph_osdc_cancel_event(rbd_dev->watch_event);
1291         rbd_dev->watch_event = NULL;
1292 fail:
1293         rbd_destroy_ops(ops);
1294         return ret;
1295 }
1296
1297 /*
1298  * Request sync osd unwatch
1299  */
1300 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1301 {
1302         struct ceph_osd_req_op *ops;
1303         int ret;
1304
1305         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1306         if (!ops)
1307                 return -ENOMEM;
1308
1309         ops[0].watch.ver = 0;
1310         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1311         ops[0].watch.flag = 0;
1312
1313         ret = rbd_req_sync_op(rbd_dev, NULL,
1314                               CEPH_NOSNAP,
1315                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1316                               ops,
1317                               rbd_dev->header_name,
1318                               0, 0, NULL, NULL, NULL);
1319
1320
1321         rbd_destroy_ops(ops);
1322         ceph_osdc_cancel_event(rbd_dev->watch_event);
1323         rbd_dev->watch_event = NULL;
1324         return ret;
1325 }
1326
1327 struct rbd_notify_info {
1328         struct rbd_device *rbd_dev;
1329 };
1330
1331 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1332 {
1333         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1334         if (!rbd_dev)
1335                 return;
1336
1337         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1338                         rbd_dev->header_name, (unsigned long long) notify_id,
1339                         (unsigned int) opcode);
1340 }
1341
1342 /*
1343  * Request sync osd notify
1344  */
1345 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1346 {
1347         struct ceph_osd_req_op *ops;
1348         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1349         struct ceph_osd_event *event;
1350         struct rbd_notify_info info;
1351         int payload_len = sizeof(u32) + sizeof(u32);
1352         int ret;
1353
1354         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1355         if (!ops)
1356                 return -ENOMEM;
1357
1358         info.rbd_dev = rbd_dev;
1359
1360         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1361                                      (void *)&info, &event);
1362         if (ret < 0)
1363                 goto fail;
1364
1365         ops[0].watch.ver = 1;
1366         ops[0].watch.flag = 1;
1367         ops[0].watch.cookie = event->cookie;
1368         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1369         ops[0].watch.timeout = 12;
1370
1371         ret = rbd_req_sync_op(rbd_dev, NULL,
1372                                CEPH_NOSNAP,
1373                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1374                                ops,
1375                                rbd_dev->header_name,
1376                                0, 0, NULL, NULL, NULL);
1377         if (ret < 0)
1378                 goto fail_event;
1379
1380         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1381         dout("ceph_osdc_wait_event returned %d\n", ret);
1382         rbd_destroy_ops(ops);
1383         return 0;
1384
1385 fail_event:
1386         ceph_osdc_cancel_event(event);
1387 fail:
1388         rbd_destroy_ops(ops);
1389         return ret;
1390 }
1391
1392 /*
1393  * Request sync osd read
1394  */
1395 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1396                              const char *object_name,
1397                              const char *class_name,
1398                              const char *method_name,
1399                              const char *data,
1400                              int len,
1401                              u64 *ver)
1402 {
1403         struct ceph_osd_req_op *ops;
1404         int class_name_len = strlen(class_name);
1405         int method_name_len = strlen(method_name);
1406         int ret;
1407
1408         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1409                                     class_name_len + method_name_len + len);
1410         if (!ops)
1411                 return -ENOMEM;
1412
1413         ops[0].cls.class_name = class_name;
1414         ops[0].cls.class_len = (__u8) class_name_len;
1415         ops[0].cls.method_name = method_name;
1416         ops[0].cls.method_len = (__u8) method_name_len;
1417         ops[0].cls.argc = 0;
1418         ops[0].cls.indata = data;
1419         ops[0].cls.indata_len = len;
1420
1421         ret = rbd_req_sync_op(rbd_dev, NULL,
1422                                CEPH_NOSNAP,
1423                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1424                                ops,
1425                                object_name, 0, 0, NULL, NULL, ver);
1426
1427         rbd_destroy_ops(ops);
1428
1429         dout("cls_exec returned %d\n", ret);
1430         return ret;
1431 }
1432
1433 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1434 {
1435         struct rbd_req_coll *coll =
1436                         kzalloc(sizeof(struct rbd_req_coll) +
1437                                 sizeof(struct rbd_req_status) * num_reqs,
1438                                 GFP_ATOMIC);
1439
1440         if (!coll)
1441                 return NULL;
1442         coll->total = num_reqs;
1443         kref_init(&coll->kref);
1444         return coll;
1445 }
1446
1447 /*
1448  * block device queue callback
1449  */
1450 static void rbd_rq_fn(struct request_queue *q)
1451 {
1452         struct rbd_device *rbd_dev = q->queuedata;
1453         struct request *rq;
1454         struct bio_pair *bp = NULL;
1455
1456         while ((rq = blk_fetch_request(q))) {
1457                 struct bio *bio;
1458                 struct bio *rq_bio, *next_bio = NULL;
1459                 bool do_write;
1460                 unsigned int size;
1461                 u64 op_size = 0;
1462                 u64 ofs;
1463                 int num_segs, cur_seg = 0;
1464                 struct rbd_req_coll *coll;
1465                 struct ceph_snap_context *snapc;
1466
1467                 /* peek at request from block layer */
1468                 if (!rq)
1469                         break;
1470
1471                 dout("fetched request\n");
1472
1473                 /* filter out block requests we don't understand */
1474                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1475                         __blk_end_request_all(rq, 0);
1476                         continue;
1477                 }
1478
1479                 /* deduce our operation (read, write) */
1480                 do_write = (rq_data_dir(rq) == WRITE);
1481
1482                 size = blk_rq_bytes(rq);
1483                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1484                 rq_bio = rq->bio;
1485                 if (do_write && rbd_dev->read_only) {
1486                         __blk_end_request_all(rq, -EROFS);
1487                         continue;
1488                 }
1489
1490                 spin_unlock_irq(q->queue_lock);
1491
1492                 down_read(&rbd_dev->header_rwsem);
1493
1494                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1495                         up_read(&rbd_dev->header_rwsem);
1496                         dout("request for non-existent snapshot");
1497                         spin_lock_irq(q->queue_lock);
1498                         __blk_end_request_all(rq, -ENXIO);
1499                         continue;
1500                 }
1501
1502                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1503
1504                 up_read(&rbd_dev->header_rwsem);
1505
1506                 dout("%s 0x%x bytes at 0x%llx\n",
1507                      do_write ? "write" : "read",
1508                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1509
1510                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1511                 coll = rbd_alloc_coll(num_segs);
1512                 if (!coll) {
1513                         spin_lock_irq(q->queue_lock);
1514                         __blk_end_request_all(rq, -ENOMEM);
1515                         ceph_put_snap_context(snapc);
1516                         continue;
1517                 }
1518
1519                 do {
1520                         /* a bio clone to be passed down to OSD req */
1521                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1522                         op_size = rbd_get_segment(&rbd_dev->header,
1523                                                   rbd_dev->header.object_prefix,
1524                                                   ofs, size,
1525                                                   NULL, NULL);
1526                         kref_get(&coll->kref);
1527                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1528                                               op_size, GFP_ATOMIC);
1529                         if (!bio) {
1530                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1531                                                        -ENOMEM, op_size);
1532                                 goto next_seg;
1533                         }
1534
1535
1536                         /* init OSD command: write or read */
1537                         if (do_write)
1538                                 rbd_req_write(rq, rbd_dev,
1539                                               snapc,
1540                                               ofs,
1541                                               op_size, bio,
1542                                               coll, cur_seg);
1543                         else
1544                                 rbd_req_read(rq, rbd_dev,
1545                                              rbd_dev->snap_id,
1546                                              ofs,
1547                                              op_size, bio,
1548                                              coll, cur_seg);
1549
1550 next_seg:
1551                         size -= op_size;
1552                         ofs += op_size;
1553
1554                         cur_seg++;
1555                         rq_bio = next_bio;
1556                 } while (size > 0);
1557                 kref_put(&coll->kref, rbd_coll_release);
1558
1559                 if (bp)
1560                         bio_pair_release(bp);
1561                 spin_lock_irq(q->queue_lock);
1562
1563                 ceph_put_snap_context(snapc);
1564         }
1565 }
1566
1567 /*
1568  * a queue callback. Makes sure that we don't create a bio that spans across
1569  * multiple osd objects. One exception would be with a single page bios,
1570  * which we handle later at bio_chain_clone
1571  */
1572 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1573                           struct bio_vec *bvec)
1574 {
1575         struct rbd_device *rbd_dev = q->queuedata;
1576         unsigned int chunk_sectors;
1577         sector_t sector;
1578         unsigned int bio_sectors;
1579         int max;
1580
1581         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1582         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1583         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1584
1585         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1586                                  + bio_sectors)) << SECTOR_SHIFT;
1587         if (max < 0)
1588                 max = 0; /* bio_add cannot handle a negative return */
1589         if (max <= bvec->bv_len && bio_sectors == 0)
1590                 return bvec->bv_len;
1591         return max;
1592 }
1593
1594 static void rbd_free_disk(struct rbd_device *rbd_dev)
1595 {
1596         struct gendisk *disk = rbd_dev->disk;
1597
1598         if (!disk)
1599                 return;
1600
1601         rbd_header_free(&rbd_dev->header);
1602
1603         if (disk->flags & GENHD_FL_UP)
1604                 del_gendisk(disk);
1605         if (disk->queue)
1606                 blk_cleanup_queue(disk->queue);
1607         put_disk(disk);
1608 }
1609
1610 /*
1611  * Read the complete header for the given rbd device.
1612  *
1613  * Returns a pointer to a dynamically-allocated buffer containing
1614  * the complete and validated header.  Caller can pass the address
1615  * of a variable that will be filled in with the version of the
1616  * header object at the time it was read.
1617  *
1618  * Returns a pointer-coded errno if a failure occurs.
1619  */
1620 static struct rbd_image_header_ondisk *
1621 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1622 {
1623         struct rbd_image_header_ondisk *ondisk = NULL;
1624         u32 snap_count = 0;
1625         u64 names_size = 0;
1626         u32 want_count;
1627         int ret;
1628
1629         /*
1630          * The complete header will include an array of its 64-bit
1631          * snapshot ids, followed by the names of those snapshots as
1632          * a contiguous block of NUL-terminated strings.  Note that
1633          * the number of snapshots could change by the time we read
1634          * it in, in which case we re-read it.
1635          */
1636         do {
1637                 size_t size;
1638
1639                 kfree(ondisk);
1640
1641                 size = sizeof (*ondisk);
1642                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1643                 size += names_size;
1644                 ondisk = kmalloc(size, GFP_KERNEL);
1645                 if (!ondisk)
1646                         return ERR_PTR(-ENOMEM);
1647
1648                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1649                                        rbd_dev->header_name,
1650                                        0, size,
1651                                        (char *) ondisk, version);
1652
1653                 if (ret < 0)
1654                         goto out_err;
1655                 if (WARN_ON((size_t) ret < size)) {
1656                         ret = -ENXIO;
1657                         pr_warning("short header read for image %s"
1658                                         " (want %zd got %d)\n",
1659                                 rbd_dev->image_name, size, ret);
1660                         goto out_err;
1661                 }
1662                 if (!rbd_dev_ondisk_valid(ondisk)) {
1663                         ret = -ENXIO;
1664                         pr_warning("invalid header for image %s\n",
1665                                 rbd_dev->image_name);
1666                         goto out_err;
1667                 }
1668
1669                 names_size = le64_to_cpu(ondisk->snap_names_len);
1670                 want_count = snap_count;
1671                 snap_count = le32_to_cpu(ondisk->snap_count);
1672         } while (snap_count != want_count);
1673
1674         return ondisk;
1675
1676 out_err:
1677         kfree(ondisk);
1678
1679         return ERR_PTR(ret);
1680 }
1681
1682 /*
1683  * reload the ondisk the header
1684  */
1685 static int rbd_read_header(struct rbd_device *rbd_dev,
1686                            struct rbd_image_header *header)
1687 {
1688         struct rbd_image_header_ondisk *ondisk;
1689         u64 ver = 0;
1690         int ret;
1691
1692         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1693         if (IS_ERR(ondisk))
1694                 return PTR_ERR(ondisk);
1695         ret = rbd_header_from_disk(header, ondisk);
1696         if (ret >= 0)
1697                 header->obj_version = ver;
1698         kfree(ondisk);
1699
1700         return ret;
1701 }
1702
1703 /*
1704  * create a snapshot
1705  */
1706 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1707                                const char *snap_name,
1708                                gfp_t gfp_flags)
1709 {
1710         int name_len = strlen(snap_name);
1711         u64 new_snapid;
1712         int ret;
1713         void *data, *p, *e;
1714         struct ceph_mon_client *monc;
1715
1716         /* we should create a snapshot only if we're pointing at the head */
1717         if (rbd_dev->snap_id != CEPH_NOSNAP)
1718                 return -EINVAL;
1719
1720         monc = &rbd_dev->rbd_client->client->monc;
1721         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1722         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1723         if (ret < 0)
1724                 return ret;
1725
1726         data = kmalloc(name_len + 16, gfp_flags);
1727         if (!data)
1728                 return -ENOMEM;
1729
1730         p = data;
1731         e = data + name_len + 16;
1732
1733         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1734         ceph_encode_64_safe(&p, e, new_snapid, bad);
1735
1736         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1737                                 "rbd", "snap_add",
1738                                 data, p - data, NULL);
1739
1740         kfree(data);
1741
1742         return ret < 0 ? ret : 0;
1743 bad:
1744         return -ERANGE;
1745 }
1746
1747 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1748 {
1749         struct rbd_snap *snap;
1750         struct rbd_snap *next;
1751
1752         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1753                 __rbd_remove_snap_dev(snap);
1754 }
1755
1756 /*
1757  * only read the first part of the ondisk header, without the snaps info
1758  */
1759 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1760 {
1761         int ret;
1762         struct rbd_image_header h;
1763
1764         ret = rbd_read_header(rbd_dev, &h);
1765         if (ret < 0)
1766                 return ret;
1767
1768         down_write(&rbd_dev->header_rwsem);
1769
1770         /* resized? */
1771         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1772                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1773
1774                 dout("setting size to %llu sectors", (unsigned long long) size);
1775                 set_capacity(rbd_dev->disk, size);
1776         }
1777
1778         /* rbd_dev->header.object_prefix shouldn't change */
1779         kfree(rbd_dev->header.snap_sizes);
1780         kfree(rbd_dev->header.snap_names);
1781         /* osd requests may still refer to snapc */
1782         ceph_put_snap_context(rbd_dev->header.snapc);
1783
1784         if (hver)
1785                 *hver = h.obj_version;
1786         rbd_dev->header.obj_version = h.obj_version;
1787         rbd_dev->header.image_size = h.image_size;
1788         rbd_dev->header.total_snaps = h.total_snaps;
1789         rbd_dev->header.snapc = h.snapc;
1790         rbd_dev->header.snap_names = h.snap_names;
1791         rbd_dev->header.snap_sizes = h.snap_sizes;
1792         /* Free the extra copy of the object prefix */
1793         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1794         kfree(h.object_prefix);
1795
1796         ret = __rbd_init_snaps_header(rbd_dev);
1797
1798         up_write(&rbd_dev->header_rwsem);
1799
1800         return ret;
1801 }
1802
1803 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1804 {
1805         int ret;
1806
1807         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1808         ret = __rbd_refresh_header(rbd_dev, hver);
1809         mutex_unlock(&ctl_mutex);
1810
1811         return ret;
1812 }
1813
1814 static int rbd_init_disk(struct rbd_device *rbd_dev)
1815 {
1816         struct gendisk *disk;
1817         struct request_queue *q;
1818         int rc;
1819         u64 segment_size;
1820         u64 total_size = 0;
1821
1822         /* contact OSD, request size info about the object being mapped */
1823         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1824         if (rc)
1825                 return rc;
1826
1827         /* no need to lock here, as rbd_dev is not registered yet */
1828         rc = __rbd_init_snaps_header(rbd_dev);
1829         if (rc)
1830                 return rc;
1831
1832         rc = rbd_header_set_snap(rbd_dev, &total_size);
1833         if (rc)
1834                 return rc;
1835
1836         /* create gendisk info */
1837         rc = -ENOMEM;
1838         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1839         if (!disk)
1840                 goto out;
1841
1842         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1843                  rbd_dev->dev_id);
1844         disk->major = rbd_dev->major;
1845         disk->first_minor = 0;
1846         disk->fops = &rbd_bd_ops;
1847         disk->private_data = rbd_dev;
1848
1849         /* init rq */
1850         rc = -ENOMEM;
1851         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1852         if (!q)
1853                 goto out_disk;
1854
1855         /* We use the default size, but let's be explicit about it. */
1856         blk_queue_physical_block_size(q, SECTOR_SIZE);
1857
1858         /* set io sizes to object size */
1859         segment_size = rbd_obj_bytes(&rbd_dev->header);
1860         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1861         blk_queue_max_segment_size(q, segment_size);
1862         blk_queue_io_min(q, segment_size);
1863         blk_queue_io_opt(q, segment_size);
1864
1865         blk_queue_merge_bvec(q, rbd_merge_bvec);
1866         disk->queue = q;
1867
1868         q->queuedata = rbd_dev;
1869
1870         rbd_dev->disk = disk;
1871         rbd_dev->q = q;
1872
1873         /* finally, announce the disk to the world */
1874         set_capacity(disk, total_size / SECTOR_SIZE);
1875         add_disk(disk);
1876
1877         pr_info("%s: added with size 0x%llx\n",
1878                 disk->disk_name, (unsigned long long)total_size);
1879         return 0;
1880
1881 out_disk:
1882         put_disk(disk);
1883 out:
1884         return rc;
1885 }
1886
1887 /*
1888   sysfs
1889 */
1890
1891 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1892 {
1893         return container_of(dev, struct rbd_device, dev);
1894 }
1895
1896 static ssize_t rbd_size_show(struct device *dev,
1897                              struct device_attribute *attr, char *buf)
1898 {
1899         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1900         sector_t size;
1901
1902         down_read(&rbd_dev->header_rwsem);
1903         size = get_capacity(rbd_dev->disk);
1904         up_read(&rbd_dev->header_rwsem);
1905
1906         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1907 }
1908
1909 static ssize_t rbd_major_show(struct device *dev,
1910                               struct device_attribute *attr, char *buf)
1911 {
1912         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913
1914         return sprintf(buf, "%d\n", rbd_dev->major);
1915 }
1916
1917 static ssize_t rbd_client_id_show(struct device *dev,
1918                                   struct device_attribute *attr, char *buf)
1919 {
1920         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1921
1922         return sprintf(buf, "client%lld\n",
1923                         ceph_client_id(rbd_dev->rbd_client->client));
1924 }
1925
1926 static ssize_t rbd_pool_show(struct device *dev,
1927                              struct device_attribute *attr, char *buf)
1928 {
1929         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1930
1931         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1932 }
1933
1934 static ssize_t rbd_pool_id_show(struct device *dev,
1935                              struct device_attribute *attr, char *buf)
1936 {
1937         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1938
1939         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1940 }
1941
1942 static ssize_t rbd_name_show(struct device *dev,
1943                              struct device_attribute *attr, char *buf)
1944 {
1945         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1946
1947         return sprintf(buf, "%s\n", rbd_dev->image_name);
1948 }
1949
1950 static ssize_t rbd_snap_show(struct device *dev,
1951                              struct device_attribute *attr,
1952                              char *buf)
1953 {
1954         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1955
1956         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1957 }
1958
1959 static ssize_t rbd_image_refresh(struct device *dev,
1960                                  struct device_attribute *attr,
1961                                  const char *buf,
1962                                  size_t size)
1963 {
1964         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1965         int ret;
1966
1967         ret = rbd_refresh_header(rbd_dev, NULL);
1968
1969         return ret < 0 ? ret : size;
1970 }
1971
1972 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1973 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1974 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1975 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1976 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1977 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1978 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1979 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1980 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1981
1982 static struct attribute *rbd_attrs[] = {
1983         &dev_attr_size.attr,
1984         &dev_attr_major.attr,
1985         &dev_attr_client_id.attr,
1986         &dev_attr_pool.attr,
1987         &dev_attr_pool_id.attr,
1988         &dev_attr_name.attr,
1989         &dev_attr_current_snap.attr,
1990         &dev_attr_refresh.attr,
1991         &dev_attr_create_snap.attr,
1992         NULL
1993 };
1994
1995 static struct attribute_group rbd_attr_group = {
1996         .attrs = rbd_attrs,
1997 };
1998
1999 static const struct attribute_group *rbd_attr_groups[] = {
2000         &rbd_attr_group,
2001         NULL
2002 };
2003
2004 static void rbd_sysfs_dev_release(struct device *dev)
2005 {
2006 }
2007
2008 static struct device_type rbd_device_type = {
2009         .name           = "rbd",
2010         .groups         = rbd_attr_groups,
2011         .release        = rbd_sysfs_dev_release,
2012 };
2013
2014
2015 /*
2016   sysfs - snapshots
2017 */
2018
2019 static ssize_t rbd_snap_size_show(struct device *dev,
2020                                   struct device_attribute *attr,
2021                                   char *buf)
2022 {
2023         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2024
2025         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2026 }
2027
2028 static ssize_t rbd_snap_id_show(struct device *dev,
2029                                 struct device_attribute *attr,
2030                                 char *buf)
2031 {
2032         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2033
2034         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2035 }
2036
2037 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2038 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2039
2040 static struct attribute *rbd_snap_attrs[] = {
2041         &dev_attr_snap_size.attr,
2042         &dev_attr_snap_id.attr,
2043         NULL,
2044 };
2045
2046 static struct attribute_group rbd_snap_attr_group = {
2047         .attrs = rbd_snap_attrs,
2048 };
2049
2050 static void rbd_snap_dev_release(struct device *dev)
2051 {
2052         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2053         kfree(snap->name);
2054         kfree(snap);
2055 }
2056
2057 static const struct attribute_group *rbd_snap_attr_groups[] = {
2058         &rbd_snap_attr_group,
2059         NULL
2060 };
2061
2062 static struct device_type rbd_snap_device_type = {
2063         .groups         = rbd_snap_attr_groups,
2064         .release        = rbd_snap_dev_release,
2065 };
2066
2067 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2068 {
2069         list_del(&snap->node);
2070         device_unregister(&snap->dev);
2071 }
2072
2073 static int rbd_register_snap_dev(struct rbd_snap *snap,
2074                                   struct device *parent)
2075 {
2076         struct device *dev = &snap->dev;
2077         int ret;
2078
2079         dev->type = &rbd_snap_device_type;
2080         dev->parent = parent;
2081         dev->release = rbd_snap_dev_release;
2082         dev_set_name(dev, "snap_%s", snap->name);
2083         ret = device_register(dev);
2084
2085         return ret;
2086 }
2087
2088 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2089                                               int i, const char *name)
2090 {
2091         struct rbd_snap *snap;
2092         int ret;
2093
2094         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2095         if (!snap)
2096                 return ERR_PTR(-ENOMEM);
2097
2098         ret = -ENOMEM;
2099         snap->name = kstrdup(name, GFP_KERNEL);
2100         if (!snap->name)
2101                 goto err;
2102
2103         snap->size = rbd_dev->header.snap_sizes[i];
2104         snap->id = rbd_dev->header.snapc->snaps[i];
2105         if (device_is_registered(&rbd_dev->dev)) {
2106                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2107                 if (ret < 0)
2108                         goto err;
2109         }
2110
2111         return snap;
2112
2113 err:
2114         kfree(snap->name);
2115         kfree(snap);
2116
2117         return ERR_PTR(ret);
2118 }
2119
2120 /*
2121  * Scan the rbd device's current snapshot list and compare it to the
2122  * newly-received snapshot context.  Remove any existing snapshots
2123  * not present in the new snapshot context.  Add a new snapshot for
2124  * any snaphots in the snapshot context not in the current list.
2125  * And verify there are no changes to snapshots we already know
2126  * about.
2127  *
2128  * Assumes the snapshots in the snapshot context are sorted by
2129  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2130  * are also maintained in that order.)
2131  */
2132 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2133 {
2134         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2135         const u32 snap_count = snapc->num_snaps;
2136         char *snap_name = rbd_dev->header.snap_names;
2137         struct list_head *head = &rbd_dev->snaps;
2138         struct list_head *links = head->next;
2139         u32 index = 0;
2140
2141         while (index < snap_count || links != head) {
2142                 u64 snap_id;
2143                 struct rbd_snap *snap;
2144
2145                 snap_id = index < snap_count ? snapc->snaps[index]
2146                                              : CEPH_NOSNAP;
2147                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2148                                      : NULL;
2149                 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2150
2151                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2152                         struct list_head *next = links->next;
2153
2154                         /* Existing snapshot not in the new snap context */
2155
2156                         if (rbd_dev->snap_id == snap->id)
2157                                 rbd_dev->snap_exists = false;
2158                         __rbd_remove_snap_dev(snap);
2159
2160                         /* Done with this list entry; advance */
2161
2162                         links = next;
2163                         continue;
2164                 }
2165
2166                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2167                         struct rbd_snap *new_snap;
2168
2169                         /* We haven't seen this snapshot before */
2170
2171                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2172                                                         snap_name);
2173                         if (IS_ERR(new_snap))
2174                                 return PTR_ERR(new_snap);
2175
2176                         /* New goes before existing, or at end of list */
2177
2178                         if (snap)
2179                                 list_add_tail(&new_snap->node, &snap->node);
2180                         else
2181                                 list_add_tail(&new_snap->node, head);
2182                 } else {
2183                         /* Already have this one */
2184
2185                         BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2186                         BUG_ON(strcmp(snap->name, snap_name));
2187
2188                         /* Done with this list entry; advance */
2189
2190                         links = links->next;
2191                 }
2192
2193                 /* Advance to the next entry in the snapshot context */
2194
2195                 index++;
2196                 snap_name += strlen(snap_name) + 1;
2197         }
2198
2199         return 0;
2200 }
2201
2202 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2203 {
2204         int ret;
2205         struct device *dev;
2206         struct rbd_snap *snap;
2207
2208         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2209         dev = &rbd_dev->dev;
2210
2211         dev->bus = &rbd_bus_type;
2212         dev->type = &rbd_device_type;
2213         dev->parent = &rbd_root_dev;
2214         dev->release = rbd_dev_release;
2215         dev_set_name(dev, "%d", rbd_dev->dev_id);
2216         ret = device_register(dev);
2217         if (ret < 0)
2218                 goto out;
2219
2220         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2221                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2222                 if (ret < 0)
2223                         break;
2224         }
2225 out:
2226         mutex_unlock(&ctl_mutex);
2227         return ret;
2228 }
2229
2230 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2231 {
2232         device_unregister(&rbd_dev->dev);
2233 }
2234
2235 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2236 {
2237         int ret, rc;
2238
2239         do {
2240                 ret = rbd_req_sync_watch(rbd_dev);
2241                 if (ret == -ERANGE) {
2242                         rc = rbd_refresh_header(rbd_dev, NULL);
2243                         if (rc < 0)
2244                                 return rc;
2245                 }
2246         } while (ret == -ERANGE);
2247
2248         return ret;
2249 }
2250
2251 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2252
2253 /*
2254  * Get a unique rbd identifier for the given new rbd_dev, and add
2255  * the rbd_dev to the global list.  The minimum rbd id is 1.
2256  */
2257 static void rbd_id_get(struct rbd_device *rbd_dev)
2258 {
2259         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2260
2261         spin_lock(&rbd_dev_list_lock);
2262         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2263         spin_unlock(&rbd_dev_list_lock);
2264 }
2265
2266 /*
2267  * Remove an rbd_dev from the global list, and record that its
2268  * identifier is no longer in use.
2269  */
2270 static void rbd_id_put(struct rbd_device *rbd_dev)
2271 {
2272         struct list_head *tmp;
2273         int rbd_id = rbd_dev->dev_id;
2274         int max_id;
2275
2276         BUG_ON(rbd_id < 1);
2277
2278         spin_lock(&rbd_dev_list_lock);
2279         list_del_init(&rbd_dev->node);
2280
2281         /*
2282          * If the id being "put" is not the current maximum, there
2283          * is nothing special we need to do.
2284          */
2285         if (rbd_id != atomic64_read(&rbd_id_max)) {
2286                 spin_unlock(&rbd_dev_list_lock);
2287                 return;
2288         }
2289
2290         /*
2291          * We need to update the current maximum id.  Search the
2292          * list to find out what it is.  We're more likely to find
2293          * the maximum at the end, so search the list backward.
2294          */
2295         max_id = 0;
2296         list_for_each_prev(tmp, &rbd_dev_list) {
2297                 struct rbd_device *rbd_dev;
2298
2299                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2300                 if (rbd_id > max_id)
2301                         max_id = rbd_id;
2302         }
2303         spin_unlock(&rbd_dev_list_lock);
2304
2305         /*
2306          * The max id could have been updated by rbd_id_get(), in
2307          * which case it now accurately reflects the new maximum.
2308          * Be careful not to overwrite the maximum value in that
2309          * case.
2310          */
2311         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2312 }
2313
2314 /*
2315  * Skips over white space at *buf, and updates *buf to point to the
2316  * first found non-space character (if any). Returns the length of
2317  * the token (string of non-white space characters) found.  Note
2318  * that *buf must be terminated with '\0'.
2319  */
2320 static inline size_t next_token(const char **buf)
2321 {
2322         /*
2323         * These are the characters that produce nonzero for
2324         * isspace() in the "C" and "POSIX" locales.
2325         */
2326         const char *spaces = " \f\n\r\t\v";
2327
2328         *buf += strspn(*buf, spaces);   /* Find start of token */
2329
2330         return strcspn(*buf, spaces);   /* Return token length */
2331 }
2332
2333 /*
2334  * Finds the next token in *buf, and if the provided token buffer is
2335  * big enough, copies the found token into it.  The result, if
2336  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2337  * must be terminated with '\0' on entry.
2338  *
2339  * Returns the length of the token found (not including the '\0').
2340  * Return value will be 0 if no token is found, and it will be >=
2341  * token_size if the token would not fit.
2342  *
2343  * The *buf pointer will be updated to point beyond the end of the
2344  * found token.  Note that this occurs even if the token buffer is
2345  * too small to hold it.
2346  */
2347 static inline size_t copy_token(const char **buf,
2348                                 char *token,
2349                                 size_t token_size)
2350 {
2351         size_t len;
2352
2353         len = next_token(buf);
2354         if (len < token_size) {
2355                 memcpy(token, *buf, len);
2356                 *(token + len) = '\0';
2357         }
2358         *buf += len;
2359
2360         return len;
2361 }
2362
2363 /*
2364  * Finds the next token in *buf, dynamically allocates a buffer big
2365  * enough to hold a copy of it, and copies the token into the new
2366  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2367  * that a duplicate buffer is created even for a zero-length token.
2368  *
2369  * Returns a pointer to the newly-allocated duplicate, or a null
2370  * pointer if memory for the duplicate was not available.  If
2371  * the lenp argument is a non-null pointer, the length of the token
2372  * (not including the '\0') is returned in *lenp.
2373  *
2374  * If successful, the *buf pointer will be updated to point beyond
2375  * the end of the found token.
2376  *
2377  * Note: uses GFP_KERNEL for allocation.
2378  */
2379 static inline char *dup_token(const char **buf, size_t *lenp)
2380 {
2381         char *dup;
2382         size_t len;
2383
2384         len = next_token(buf);
2385         dup = kmalloc(len + 1, GFP_KERNEL);
2386         if (!dup)
2387                 return NULL;
2388
2389         memcpy(dup, *buf, len);
2390         *(dup + len) = '\0';
2391         *buf += len;
2392
2393         if (lenp)
2394                 *lenp = len;
2395
2396         return dup;
2397 }
2398
2399 /*
2400  * This fills in the pool_name, image_name, image_name_len, snap_name,
2401  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2402  * on the list of monitor addresses and other options provided via
2403  * /sys/bus/rbd/add.
2404  *
2405  * Note: rbd_dev is assumed to have been initially zero-filled.
2406  */
2407 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2408                               const char *buf,
2409                               const char **mon_addrs,
2410                               size_t *mon_addrs_size,
2411                               char *options,
2412                              size_t options_size)
2413 {
2414         size_t len;
2415         int ret;
2416
2417         /* The first four tokens are required */
2418
2419         len = next_token(&buf);
2420         if (!len)
2421                 return -EINVAL;
2422         *mon_addrs_size = len + 1;
2423         *mon_addrs = buf;
2424
2425         buf += len;
2426
2427         len = copy_token(&buf, options, options_size);
2428         if (!len || len >= options_size)
2429                 return -EINVAL;
2430
2431         ret = -ENOMEM;
2432         rbd_dev->pool_name = dup_token(&buf, NULL);
2433         if (!rbd_dev->pool_name)
2434                 goto out_err;
2435
2436         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2437         if (!rbd_dev->image_name)
2438                 goto out_err;
2439
2440         /* Create the name of the header object */
2441
2442         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2443                                                 + sizeof (RBD_SUFFIX),
2444                                         GFP_KERNEL);
2445         if (!rbd_dev->header_name)
2446                 goto out_err;
2447         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2448
2449         /*
2450          * The snapshot name is optional.  If none is is supplied,
2451          * we use the default value.
2452          */
2453         rbd_dev->snap_name = dup_token(&buf, &len);
2454         if (!rbd_dev->snap_name)
2455                 goto out_err;
2456         if (!len) {
2457                 /* Replace the empty name with the default */
2458                 kfree(rbd_dev->snap_name);
2459                 rbd_dev->snap_name
2460                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2461                 if (!rbd_dev->snap_name)
2462                         goto out_err;
2463
2464                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2465                         sizeof (RBD_SNAP_HEAD_NAME));
2466         }
2467
2468         return 0;
2469
2470 out_err:
2471         kfree(rbd_dev->header_name);
2472         rbd_dev->header_name = NULL;
2473         kfree(rbd_dev->image_name);
2474         rbd_dev->image_name = NULL;
2475         rbd_dev->image_name_len = 0;
2476         kfree(rbd_dev->pool_name);
2477         rbd_dev->pool_name = NULL;
2478
2479         return ret;
2480 }
2481
2482 static ssize_t rbd_add(struct bus_type *bus,
2483                        const char *buf,
2484                        size_t count)
2485 {
2486         char *options;
2487         struct rbd_device *rbd_dev = NULL;
2488         const char *mon_addrs = NULL;
2489         size_t mon_addrs_size = 0;
2490         struct ceph_osd_client *osdc;
2491         int rc = -ENOMEM;
2492
2493         if (!try_module_get(THIS_MODULE))
2494                 return -ENODEV;
2495
2496         options = kmalloc(count, GFP_KERNEL);
2497         if (!options)
2498                 goto err_nomem;
2499         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2500         if (!rbd_dev)
2501                 goto err_nomem;
2502
2503         /* static rbd_device initialization */
2504         spin_lock_init(&rbd_dev->lock);
2505         INIT_LIST_HEAD(&rbd_dev->node);
2506         INIT_LIST_HEAD(&rbd_dev->snaps);
2507         init_rwsem(&rbd_dev->header_rwsem);
2508
2509         /* generate unique id: find highest unique id, add one */
2510         rbd_id_get(rbd_dev);
2511
2512         /* Fill in the device name, now that we have its id. */
2513         BUILD_BUG_ON(DEV_NAME_LEN
2514                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2515         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2516
2517         /* parse add command */
2518         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2519                                 options, count);
2520         if (rc)
2521                 goto err_put_id;
2522
2523         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2524         if (rc < 0)
2525                 goto err_put_id;
2526
2527         /* pick the pool */
2528         osdc = &rbd_dev->rbd_client->client->osdc;
2529         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2530         if (rc < 0)
2531                 goto err_out_client;
2532         rbd_dev->pool_id = rc;
2533
2534         /* register our block device */
2535         rc = register_blkdev(0, rbd_dev->name);
2536         if (rc < 0)
2537                 goto err_out_client;
2538         rbd_dev->major = rc;
2539
2540         rc = rbd_bus_add_dev(rbd_dev);
2541         if (rc)
2542                 goto err_out_blkdev;
2543
2544         /*
2545          * At this point cleanup in the event of an error is the job
2546          * of the sysfs code (initiated by rbd_bus_del_dev()).
2547          *
2548          * Set up and announce blkdev mapping.
2549          */
2550         rc = rbd_init_disk(rbd_dev);
2551         if (rc)
2552                 goto err_out_bus;
2553
2554         rc = rbd_init_watch_dev(rbd_dev);
2555         if (rc)
2556                 goto err_out_bus;
2557
2558         return count;
2559
2560 err_out_bus:
2561         /* this will also clean up rest of rbd_dev stuff */
2562
2563         rbd_bus_del_dev(rbd_dev);
2564         kfree(options);
2565         return rc;
2566
2567 err_out_blkdev:
2568         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2569 err_out_client:
2570         rbd_put_client(rbd_dev);
2571 err_put_id:
2572         if (rbd_dev->pool_name) {
2573                 kfree(rbd_dev->snap_name);
2574                 kfree(rbd_dev->header_name);
2575                 kfree(rbd_dev->image_name);
2576                 kfree(rbd_dev->pool_name);
2577         }
2578         rbd_id_put(rbd_dev);
2579 err_nomem:
2580         kfree(rbd_dev);
2581         kfree(options);
2582
2583         dout("Error adding device %s\n", buf);
2584         module_put(THIS_MODULE);
2585
2586         return (ssize_t) rc;
2587 }
2588
2589 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2590 {
2591         struct list_head *tmp;
2592         struct rbd_device *rbd_dev;
2593
2594         spin_lock(&rbd_dev_list_lock);
2595         list_for_each(tmp, &rbd_dev_list) {
2596                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2597                 if (rbd_dev->dev_id == dev_id) {
2598                         spin_unlock(&rbd_dev_list_lock);
2599                         return rbd_dev;
2600                 }
2601         }
2602         spin_unlock(&rbd_dev_list_lock);
2603         return NULL;
2604 }
2605
2606 static void rbd_dev_release(struct device *dev)
2607 {
2608         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2609
2610         if (rbd_dev->watch_request) {
2611                 struct ceph_client *client = rbd_dev->rbd_client->client;
2612
2613                 ceph_osdc_unregister_linger_request(&client->osdc,
2614                                                     rbd_dev->watch_request);
2615         }
2616         if (rbd_dev->watch_event)
2617                 rbd_req_sync_unwatch(rbd_dev);
2618
2619         rbd_put_client(rbd_dev);
2620
2621         /* clean up and free blkdev */
2622         rbd_free_disk(rbd_dev);
2623         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2624
2625         /* done with the id, and with the rbd_dev */
2626         kfree(rbd_dev->snap_name);
2627         kfree(rbd_dev->header_name);
2628         kfree(rbd_dev->pool_name);
2629         kfree(rbd_dev->image_name);
2630         rbd_id_put(rbd_dev);
2631         kfree(rbd_dev);
2632
2633         /* release module ref */
2634         module_put(THIS_MODULE);
2635 }
2636
2637 static ssize_t rbd_remove(struct bus_type *bus,
2638                           const char *buf,
2639                           size_t count)
2640 {
2641         struct rbd_device *rbd_dev = NULL;
2642         int target_id, rc;
2643         unsigned long ul;
2644         int ret = count;
2645
2646         rc = strict_strtoul(buf, 10, &ul);
2647         if (rc)
2648                 return rc;
2649
2650         /* convert to int; abort if we lost anything in the conversion */
2651         target_id = (int) ul;
2652         if (target_id != ul)
2653                 return -EINVAL;
2654
2655         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2656
2657         rbd_dev = __rbd_get_dev(target_id);
2658         if (!rbd_dev) {
2659                 ret = -ENOENT;
2660                 goto done;
2661         }
2662
2663         __rbd_remove_all_snaps(rbd_dev);
2664         rbd_bus_del_dev(rbd_dev);
2665
2666 done:
2667         mutex_unlock(&ctl_mutex);
2668         return ret;
2669 }
2670
2671 static ssize_t rbd_snap_add(struct device *dev,
2672                             struct device_attribute *attr,
2673                             const char *buf,
2674                             size_t count)
2675 {
2676         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2677         int ret;
2678         char *name = kmalloc(count + 1, GFP_KERNEL);
2679         if (!name)
2680                 return -ENOMEM;
2681
2682         snprintf(name, count, "%s", buf);
2683
2684         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2685
2686         ret = rbd_header_add_snap(rbd_dev,
2687                                   name, GFP_KERNEL);
2688         if (ret < 0)
2689                 goto err_unlock;
2690
2691         ret = __rbd_refresh_header(rbd_dev, NULL);
2692         if (ret < 0)
2693                 goto err_unlock;
2694
2695         /* shouldn't hold ctl_mutex when notifying.. notify might
2696            trigger a watch callback that would need to get that mutex */
2697         mutex_unlock(&ctl_mutex);
2698
2699         /* make a best effort, don't error if failed */
2700         rbd_req_sync_notify(rbd_dev);
2701
2702         ret = count;
2703         kfree(name);
2704         return ret;
2705
2706 err_unlock:
2707         mutex_unlock(&ctl_mutex);
2708         kfree(name);
2709         return ret;
2710 }
2711
2712 /*
2713  * create control files in sysfs
2714  * /sys/bus/rbd/...
2715  */
2716 static int rbd_sysfs_init(void)
2717 {
2718         int ret;
2719
2720         ret = device_register(&rbd_root_dev);
2721         if (ret < 0)
2722                 return ret;
2723
2724         ret = bus_register(&rbd_bus_type);
2725         if (ret < 0)
2726                 device_unregister(&rbd_root_dev);
2727
2728         return ret;
2729 }
2730
2731 static void rbd_sysfs_cleanup(void)
2732 {
2733         bus_unregister(&rbd_bus_type);
2734         device_unregister(&rbd_root_dev);
2735 }
2736
2737 int __init rbd_init(void)
2738 {
2739         int rc;
2740
2741         rc = rbd_sysfs_init();
2742         if (rc)
2743                 return rc;
2744         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2745         return 0;
2746 }
2747
2748 void __exit rbd_exit(void)
2749 {
2750         rbd_sysfs_cleanup();
2751 }
2752
2753 module_init(rbd_init);
2754 module_exit(rbd_exit);
2755
2756 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2757 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2758 MODULE_DESCRIPTION("rados block device");
2759
2760 /* following authorship retained from original osdblk.c */
2761 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2762
2763 MODULE_LICENSE("GPL");