]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: expose the correct size of the device in sysfs
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u64 snap_seq;
86         u32 total_snaps;
87
88         char *snap_names;
89         u64 *snap_sizes;
90
91         u64 obj_version;
92 };
93
94 struct rbd_options {
95         int     notify_timeout;
96 };
97
98 /*
99  * an instance of the client.  multiple devices may share an rbd client.
100  */
101 struct rbd_client {
102         struct ceph_client      *client;
103         struct rbd_options      *rbd_opts;
104         struct kref             kref;
105         struct list_head        node;
106 };
107
108 /*
109  * a request completion status
110  */
111 struct rbd_req_status {
112         int done;
113         int rc;
114         u64 bytes;
115 };
116
117 /*
118  * a collection of requests
119  */
120 struct rbd_req_coll {
121         int                     total;
122         int                     num_done;
123         struct kref             kref;
124         struct rbd_req_status   status[0];
125 };
126
127 /*
128  * a single io request
129  */
130 struct rbd_request {
131         struct request          *rq;            /* blk layer request */
132         struct bio              *bio;           /* cloned bio */
133         struct page             **pages;        /* list of used pages */
134         u64                     len;
135         int                     coll_index;
136         struct rbd_req_coll     *coll;
137 };
138
139 struct rbd_snap {
140         struct  device          dev;
141         const char              *name;
142         u64                     size;
143         struct list_head        node;
144         u64                     id;
145 };
146
147 /*
148  * a single device
149  */
150 struct rbd_device {
151         int                     id;             /* blkdev unique id */
152
153         int                     major;          /* blkdev assigned major */
154         struct gendisk          *disk;          /* blkdev's gendisk and rq */
155         struct request_queue    *q;
156
157         struct rbd_client       *rbd_client;
158
159         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161         spinlock_t              lock;           /* queue lock */
162
163         struct rbd_image_header header;
164         char                    *image_name;
165         size_t                  image_name_len;
166         char                    *header_name;
167         char                    *pool_name;
168         int                     pool_id;
169
170         struct ceph_osd_event   *watch_event;
171         struct ceph_osd_request *watch_request;
172
173         /* protects updating the header */
174         struct rw_semaphore     header_rwsem;
175         /* name of the snapshot this device reads from */
176         char                    *snap_name;
177         /* id of the snapshot this device reads from */
178         u64                     snap_id;        /* current snapshot id */
179         /* whether the snap_id this device reads from still exists */
180         bool                    snap_exists;
181         int                     read_only;
182
183         struct list_head        node;
184
185         /* list of snapshots */
186         struct list_head        snaps;
187
188         /* sysfs related */
189         struct device           dev;
190 };
191
192 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
193
194 static LIST_HEAD(rbd_dev_list);    /* devices */
195 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196
197 static LIST_HEAD(rbd_client_list);              /* clients */
198 static DEFINE_SPINLOCK(rbd_client_list_lock);
199
200 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
201 static void rbd_dev_release(struct device *dev);
202 static ssize_t rbd_snap_add(struct device *dev,
203                             struct device_attribute *attr,
204                             const char *buf,
205                             size_t count);
206 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
207                                   struct rbd_snap *snap);
208
209 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
210                        size_t count);
211 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212                           size_t count);
213
214 static struct bus_attribute rbd_bus_attrs[] = {
215         __ATTR(add, S_IWUSR, NULL, rbd_add),
216         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
217         __ATTR_NULL
218 };
219
220 static struct bus_type rbd_bus_type = {
221         .name           = "rbd",
222         .bus_attrs      = rbd_bus_attrs,
223 };
224
225 static void rbd_root_dev_release(struct device *dev)
226 {
227 }
228
229 static struct device rbd_root_dev = {
230         .init_name =    "rbd",
231         .release =      rbd_root_dev_release,
232 };
233
234
235 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
236 {
237         return get_device(&rbd_dev->dev);
238 }
239
240 static void rbd_put_dev(struct rbd_device *rbd_dev)
241 {
242         put_device(&rbd_dev->dev);
243 }
244
245 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
246
247 static int rbd_open(struct block_device *bdev, fmode_t mode)
248 {
249         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
250
251         rbd_get_dev(rbd_dev);
252
253         set_device_ro(bdev, rbd_dev->read_only);
254
255         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
256                 return -EROFS;
257
258         return 0;
259 }
260
261 static int rbd_release(struct gendisk *disk, fmode_t mode)
262 {
263         struct rbd_device *rbd_dev = disk->private_data;
264
265         rbd_put_dev(rbd_dev);
266
267         return 0;
268 }
269
270 static const struct block_device_operations rbd_bd_ops = {
271         .owner                  = THIS_MODULE,
272         .open                   = rbd_open,
273         .release                = rbd_release,
274 };
275
276 /*
277  * Initialize an rbd client instance.
278  * We own *ceph_opts.
279  */
280 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
281                                             struct rbd_options *rbd_opts)
282 {
283         struct rbd_client *rbdc;
284         int ret = -ENOMEM;
285
286         dout("rbd_client_create\n");
287         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288         if (!rbdc)
289                 goto out_opt;
290
291         kref_init(&rbdc->kref);
292         INIT_LIST_HEAD(&rbdc->node);
293
294         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
295
296         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
297         if (IS_ERR(rbdc->client))
298                 goto out_mutex;
299         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
300
301         ret = ceph_open_session(rbdc->client);
302         if (ret < 0)
303                 goto out_err;
304
305         rbdc->rbd_opts = rbd_opts;
306
307         spin_lock(&rbd_client_list_lock);
308         list_add_tail(&rbdc->node, &rbd_client_list);
309         spin_unlock(&rbd_client_list_lock);
310
311         mutex_unlock(&ctl_mutex);
312
313         dout("rbd_client_create created %p\n", rbdc);
314         return rbdc;
315
316 out_err:
317         ceph_destroy_client(rbdc->client);
318 out_mutex:
319         mutex_unlock(&ctl_mutex);
320         kfree(rbdc);
321 out_opt:
322         if (ceph_opts)
323                 ceph_destroy_options(ceph_opts);
324         return ERR_PTR(ret);
325 }
326
327 /*
328  * Find a ceph client with specific addr and configuration.
329  */
330 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
331 {
332         struct rbd_client *client_node;
333
334         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
335                 return NULL;
336
337         list_for_each_entry(client_node, &rbd_client_list, node)
338                 if (!ceph_compare_options(ceph_opts, client_node->client))
339                         return client_node;
340         return NULL;
341 }
342
343 /*
344  * mount options
345  */
346 enum {
347         Opt_notify_timeout,
348         Opt_last_int,
349         /* int args above */
350         Opt_last_string,
351         /* string args above */
352 };
353
354 static match_table_t rbd_opts_tokens = {
355         {Opt_notify_timeout, "notify_timeout=%d"},
356         /* int args above */
357         /* string args above */
358         {-1, NULL}
359 };
360
361 static int parse_rbd_opts_token(char *c, void *private)
362 {
363         struct rbd_options *rbd_opts = private;
364         substring_t argstr[MAX_OPT_ARGS];
365         int token, intval, ret;
366
367         token = match_token(c, rbd_opts_tokens, argstr);
368         if (token < 0)
369                 return -EINVAL;
370
371         if (token < Opt_last_int) {
372                 ret = match_int(&argstr[0], &intval);
373                 if (ret < 0) {
374                         pr_err("bad mount option arg (not int) "
375                                "at '%s'\n", c);
376                         return ret;
377                 }
378                 dout("got int token %d val %d\n", token, intval);
379         } else if (token > Opt_last_int && token < Opt_last_string) {
380                 dout("got string token %d val %s\n", token,
381                      argstr[0].from);
382         } else {
383                 dout("got token %d\n", token);
384         }
385
386         switch (token) {
387         case Opt_notify_timeout:
388                 rbd_opts->notify_timeout = intval;
389                 break;
390         default:
391                 BUG_ON(token);
392         }
393         return 0;
394 }
395
396 /*
397  * Get a ceph client with specific addr and configuration, if one does
398  * not exist create it.
399  */
400 static struct rbd_client *rbd_get_client(const char *mon_addr,
401                                          size_t mon_addr_len,
402                                          char *options)
403 {
404         struct rbd_client *rbdc;
405         struct ceph_options *ceph_opts;
406         struct rbd_options *rbd_opts;
407
408         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
409         if (!rbd_opts)
410                 return ERR_PTR(-ENOMEM);
411
412         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
413
414         ceph_opts = ceph_parse_options(options, mon_addr,
415                                         mon_addr + mon_addr_len,
416                                         parse_rbd_opts_token, rbd_opts);
417         if (IS_ERR(ceph_opts)) {
418                 kfree(rbd_opts);
419                 return ERR_CAST(ceph_opts);
420         }
421
422         spin_lock(&rbd_client_list_lock);
423         rbdc = __rbd_client_find(ceph_opts);
424         if (rbdc) {
425                 /* using an existing client */
426                 kref_get(&rbdc->kref);
427                 spin_unlock(&rbd_client_list_lock);
428
429                 ceph_destroy_options(ceph_opts);
430                 kfree(rbd_opts);
431
432                 return rbdc;
433         }
434         spin_unlock(&rbd_client_list_lock);
435
436         rbdc = rbd_client_create(ceph_opts, rbd_opts);
437
438         if (IS_ERR(rbdc))
439                 kfree(rbd_opts);
440
441         return rbdc;
442 }
443
444 /*
445  * Destroy ceph client
446  *
447  * Caller must hold rbd_client_list_lock.
448  */
449 static void rbd_client_release(struct kref *kref)
450 {
451         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
452
453         dout("rbd_release_client %p\n", rbdc);
454         spin_lock(&rbd_client_list_lock);
455         list_del(&rbdc->node);
456         spin_unlock(&rbd_client_list_lock);
457
458         ceph_destroy_client(rbdc->client);
459         kfree(rbdc->rbd_opts);
460         kfree(rbdc);
461 }
462
463 /*
464  * Drop reference to ceph client node. If it's not referenced anymore, release
465  * it.
466  */
467 static void rbd_put_client(struct rbd_device *rbd_dev)
468 {
469         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
470         rbd_dev->rbd_client = NULL;
471 }
472
473 /*
474  * Destroy requests collection
475  */
476 static void rbd_coll_release(struct kref *kref)
477 {
478         struct rbd_req_coll *coll =
479                 container_of(kref, struct rbd_req_coll, kref);
480
481         dout("rbd_coll_release %p\n", coll);
482         kfree(coll);
483 }
484
485 /*
486  * Create a new header structure, translate header format from the on-disk
487  * header.
488  */
489 static int rbd_header_from_disk(struct rbd_image_header *header,
490                                  struct rbd_image_header_ondisk *ondisk,
491                                  u32 allocated_snaps,
492                                  gfp_t gfp_flags)
493 {
494         u32 i, snap_count;
495
496         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
497                 return -ENXIO;
498
499         snap_count = le32_to_cpu(ondisk->snap_count);
500         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
501                          / sizeof (*ondisk))
502                 return -EINVAL;
503         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
504                                 snap_count * sizeof(u64),
505                                 gfp_flags);
506         if (!header->snapc)
507                 return -ENOMEM;
508
509         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
510         if (snap_count) {
511                 header->snap_names = kmalloc(header->snap_names_len,
512                                              gfp_flags);
513                 if (!header->snap_names)
514                         goto err_snapc;
515                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
516                                              gfp_flags);
517                 if (!header->snap_sizes)
518                         goto err_names;
519         } else {
520                 header->snap_names = NULL;
521                 header->snap_sizes = NULL;
522         }
523
524         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
525                                         gfp_flags);
526         if (!header->object_prefix)
527                 goto err_sizes;
528
529         memcpy(header->object_prefix, ondisk->block_name,
530                sizeof(ondisk->block_name));
531         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
532
533         header->image_size = le64_to_cpu(ondisk->image_size);
534         header->obj_order = ondisk->options.order;
535         header->crypt_type = ondisk->options.crypt_type;
536         header->comp_type = ondisk->options.comp_type;
537
538         atomic_set(&header->snapc->nref, 1);
539         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
540         header->snapc->num_snaps = snap_count;
541         header->total_snaps = snap_count;
542
543         if (snap_count && allocated_snaps == snap_count) {
544                 for (i = 0; i < snap_count; i++) {
545                         header->snapc->snaps[i] =
546                                 le64_to_cpu(ondisk->snaps[i].id);
547                         header->snap_sizes[i] =
548                                 le64_to_cpu(ondisk->snaps[i].image_size);
549                 }
550
551                 /* copy snapshot names */
552                 memcpy(header->snap_names, &ondisk->snaps[i],
553                         header->snap_names_len);
554         }
555
556         return 0;
557
558 err_sizes:
559         kfree(header->snap_sizes);
560 err_names:
561         kfree(header->snap_names);
562 err_snapc:
563         kfree(header->snapc);
564         return -ENOMEM;
565 }
566
567 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
568                         u64 *seq, u64 *size)
569 {
570         int i;
571         char *p = header->snap_names;
572
573         for (i = 0; i < header->total_snaps; i++) {
574                 if (!strcmp(snap_name, p)) {
575
576                         /* Found it.  Pass back its id and/or size */
577
578                         if (seq)
579                                 *seq = header->snapc->snaps[i];
580                         if (size)
581                                 *size = header->snap_sizes[i];
582                         return i;
583                 }
584                 p += strlen(p) + 1;     /* Skip ahead to the next name */
585         }
586         return -ENOENT;
587 }
588
589 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
590 {
591         struct rbd_image_header *header = &rbd_dev->header;
592         struct ceph_snap_context *snapc = header->snapc;
593         int ret = -ENOENT;
594
595         down_write(&rbd_dev->header_rwsem);
596
597         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
598                     sizeof (RBD_SNAP_HEAD_NAME))) {
599                 if (header->total_snaps)
600                         snapc->seq = header->snap_seq;
601                 else
602                         snapc->seq = 0;
603                 rbd_dev->snap_id = CEPH_NOSNAP;
604                 rbd_dev->snap_exists = false;
605                 rbd_dev->read_only = 0;
606                 if (size)
607                         *size = header->image_size;
608         } else {
609                 ret = snap_by_name(header, rbd_dev->snap_name,
610                                         &snapc->seq, size);
611                 if (ret < 0)
612                         goto done;
613                 rbd_dev->snap_id = snapc->seq;
614                 rbd_dev->snap_exists = true;
615                 rbd_dev->read_only = 1;
616         }
617
618         ret = 0;
619 done:
620         up_write(&rbd_dev->header_rwsem);
621         return ret;
622 }
623
624 static void rbd_header_free(struct rbd_image_header *header)
625 {
626         kfree(header->object_prefix);
627         kfree(header->snap_sizes);
628         kfree(header->snap_names);
629         kfree(header->snapc);
630 }
631
632 /*
633  * get the actual striped segment name, offset and length
634  */
635 static u64 rbd_get_segment(struct rbd_image_header *header,
636                            const char *object_prefix,
637                            u64 ofs, u64 len,
638                            char *seg_name, u64 *segofs)
639 {
640         u64 seg = ofs >> header->obj_order;
641
642         if (seg_name)
643                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
644                          "%s.%012llx", object_prefix, seg);
645
646         ofs = ofs & ((1 << header->obj_order) - 1);
647         len = min_t(u64, len, (1 << header->obj_order) - ofs);
648
649         if (segofs)
650                 *segofs = ofs;
651
652         return len;
653 }
654
655 static int rbd_get_num_segments(struct rbd_image_header *header,
656                                 u64 ofs, u64 len)
657 {
658         u64 start_seg = ofs >> header->obj_order;
659         u64 end_seg = (ofs + len - 1) >> header->obj_order;
660         return end_seg - start_seg + 1;
661 }
662
663 /*
664  * returns the size of an object in the image
665  */
666 static u64 rbd_obj_bytes(struct rbd_image_header *header)
667 {
668         return 1 << header->obj_order;
669 }
670
671 /*
672  * bio helpers
673  */
674
675 static void bio_chain_put(struct bio *chain)
676 {
677         struct bio *tmp;
678
679         while (chain) {
680                 tmp = chain;
681                 chain = chain->bi_next;
682                 bio_put(tmp);
683         }
684 }
685
686 /*
687  * zeros a bio chain, starting at specific offset
688  */
689 static void zero_bio_chain(struct bio *chain, int start_ofs)
690 {
691         struct bio_vec *bv;
692         unsigned long flags;
693         void *buf;
694         int i;
695         int pos = 0;
696
697         while (chain) {
698                 bio_for_each_segment(bv, chain, i) {
699                         if (pos + bv->bv_len > start_ofs) {
700                                 int remainder = max(start_ofs - pos, 0);
701                                 buf = bvec_kmap_irq(bv, &flags);
702                                 memset(buf + remainder, 0,
703                                        bv->bv_len - remainder);
704                                 bvec_kunmap_irq(buf, &flags);
705                         }
706                         pos += bv->bv_len;
707                 }
708
709                 chain = chain->bi_next;
710         }
711 }
712
713 /*
714  * bio_chain_clone - clone a chain of bios up to a certain length.
715  * might return a bio_pair that will need to be released.
716  */
717 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
718                                    struct bio_pair **bp,
719                                    int len, gfp_t gfpmask)
720 {
721         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
722         int total = 0;
723
724         if (*bp) {
725                 bio_pair_release(*bp);
726                 *bp = NULL;
727         }
728
729         while (old_chain && (total < len)) {
730                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
731                 if (!tmp)
732                         goto err_out;
733
734                 if (total + old_chain->bi_size > len) {
735                         struct bio_pair *bp;
736
737                         /*
738                          * this split can only happen with a single paged bio,
739                          * split_bio will BUG_ON if this is not the case
740                          */
741                         dout("bio_chain_clone split! total=%d remaining=%d"
742                              "bi_size=%d\n",
743                              (int)total, (int)len-total,
744                              (int)old_chain->bi_size);
745
746                         /* split the bio. We'll release it either in the next
747                            call, or it will have to be released outside */
748                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
749                         if (!bp)
750                                 goto err_out;
751
752                         __bio_clone(tmp, &bp->bio1);
753
754                         *next = &bp->bio2;
755                 } else {
756                         __bio_clone(tmp, old_chain);
757                         *next = old_chain->bi_next;
758                 }
759
760                 tmp->bi_bdev = NULL;
761                 gfpmask &= ~__GFP_WAIT;
762                 tmp->bi_next = NULL;
763
764                 if (!new_chain) {
765                         new_chain = tail = tmp;
766                 } else {
767                         tail->bi_next = tmp;
768                         tail = tmp;
769                 }
770                 old_chain = old_chain->bi_next;
771
772                 total += tmp->bi_size;
773         }
774
775         BUG_ON(total < len);
776
777         if (tail)
778                 tail->bi_next = NULL;
779
780         *old = old_chain;
781
782         return new_chain;
783
784 err_out:
785         dout("bio_chain_clone with err\n");
786         bio_chain_put(new_chain);
787         return NULL;
788 }
789
790 /*
791  * helpers for osd request op vectors.
792  */
793 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
794                             int num_ops,
795                             int opcode,
796                             u32 payload_len)
797 {
798         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
799                        GFP_NOIO);
800         if (!*ops)
801                 return -ENOMEM;
802         (*ops)[0].op = opcode;
803         /*
804          * op extent offset and length will be set later on
805          * in calc_raw_layout()
806          */
807         (*ops)[0].payload_len = payload_len;
808         return 0;
809 }
810
811 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
812 {
813         kfree(ops);
814 }
815
816 static void rbd_coll_end_req_index(struct request *rq,
817                                    struct rbd_req_coll *coll,
818                                    int index,
819                                    int ret, u64 len)
820 {
821         struct request_queue *q;
822         int min, max, i;
823
824         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
825              coll, index, ret, len);
826
827         if (!rq)
828                 return;
829
830         if (!coll) {
831                 blk_end_request(rq, ret, len);
832                 return;
833         }
834
835         q = rq->q;
836
837         spin_lock_irq(q->queue_lock);
838         coll->status[index].done = 1;
839         coll->status[index].rc = ret;
840         coll->status[index].bytes = len;
841         max = min = coll->num_done;
842         while (max < coll->total && coll->status[max].done)
843                 max++;
844
845         for (i = min; i<max; i++) {
846                 __blk_end_request(rq, coll->status[i].rc,
847                                   coll->status[i].bytes);
848                 coll->num_done++;
849                 kref_put(&coll->kref, rbd_coll_release);
850         }
851         spin_unlock_irq(q->queue_lock);
852 }
853
854 static void rbd_coll_end_req(struct rbd_request *req,
855                              int ret, u64 len)
856 {
857         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
858 }
859
860 /*
861  * Send ceph osd request
862  */
863 static int rbd_do_request(struct request *rq,
864                           struct rbd_device *rbd_dev,
865                           struct ceph_snap_context *snapc,
866                           u64 snapid,
867                           const char *object_name, u64 ofs, u64 len,
868                           struct bio *bio,
869                           struct page **pages,
870                           int num_pages,
871                           int flags,
872                           struct ceph_osd_req_op *ops,
873                           struct rbd_req_coll *coll,
874                           int coll_index,
875                           void (*rbd_cb)(struct ceph_osd_request *req,
876                                          struct ceph_msg *msg),
877                           struct ceph_osd_request **linger_req,
878                           u64 *ver)
879 {
880         struct ceph_osd_request *req;
881         struct ceph_file_layout *layout;
882         int ret;
883         u64 bno;
884         struct timespec mtime = CURRENT_TIME;
885         struct rbd_request *req_data;
886         struct ceph_osd_request_head *reqhead;
887         struct ceph_osd_client *osdc;
888
889         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
890         if (!req_data) {
891                 if (coll)
892                         rbd_coll_end_req_index(rq, coll, coll_index,
893                                                -ENOMEM, len);
894                 return -ENOMEM;
895         }
896
897         if (coll) {
898                 req_data->coll = coll;
899                 req_data->coll_index = coll_index;
900         }
901
902         dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
903                 object_name, len, ofs);
904
905         down_read(&rbd_dev->header_rwsem);
906
907         osdc = &rbd_dev->rbd_client->client->osdc;
908         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
909                                         false, GFP_NOIO, pages, bio);
910         if (!req) {
911                 up_read(&rbd_dev->header_rwsem);
912                 ret = -ENOMEM;
913                 goto done_pages;
914         }
915
916         req->r_callback = rbd_cb;
917
918         req_data->rq = rq;
919         req_data->bio = bio;
920         req_data->pages = pages;
921         req_data->len = len;
922
923         req->r_priv = req_data;
924
925         reqhead = req->r_request->front.iov_base;
926         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
927
928         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
929         req->r_oid_len = strlen(req->r_oid);
930
931         layout = &req->r_file_layout;
932         memset(layout, 0, sizeof(*layout));
933         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
934         layout->fl_stripe_count = cpu_to_le32(1);
935         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
936         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
937         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
938                                 req, ops);
939
940         ceph_osdc_build_request(req, ofs, &len,
941                                 ops,
942                                 snapc,
943                                 &mtime,
944                                 req->r_oid, req->r_oid_len);
945         up_read(&rbd_dev->header_rwsem);
946
947         if (linger_req) {
948                 ceph_osdc_set_request_linger(osdc, req);
949                 *linger_req = req;
950         }
951
952         ret = ceph_osdc_start_request(osdc, req, false);
953         if (ret < 0)
954                 goto done_err;
955
956         if (!rbd_cb) {
957                 ret = ceph_osdc_wait_request(osdc, req);
958                 if (ver)
959                         *ver = le64_to_cpu(req->r_reassert_version.version);
960                 dout("reassert_ver=%lld\n",
961                      le64_to_cpu(req->r_reassert_version.version));
962                 ceph_osdc_put_request(req);
963         }
964         return ret;
965
966 done_err:
967         bio_chain_put(req_data->bio);
968         ceph_osdc_put_request(req);
969 done_pages:
970         rbd_coll_end_req(req_data, ret, len);
971         kfree(req_data);
972         return ret;
973 }
974
975 /*
976  * Ceph osd op callback
977  */
978 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
979 {
980         struct rbd_request *req_data = req->r_priv;
981         struct ceph_osd_reply_head *replyhead;
982         struct ceph_osd_op *op;
983         __s32 rc;
984         u64 bytes;
985         int read_op;
986
987         /* parse reply */
988         replyhead = msg->front.iov_base;
989         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
990         op = (void *)(replyhead + 1);
991         rc = le32_to_cpu(replyhead->result);
992         bytes = le64_to_cpu(op->extent.length);
993         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
994
995         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
996
997         if (rc == -ENOENT && read_op) {
998                 zero_bio_chain(req_data->bio, 0);
999                 rc = 0;
1000         } else if (rc == 0 && read_op && bytes < req_data->len) {
1001                 zero_bio_chain(req_data->bio, bytes);
1002                 bytes = req_data->len;
1003         }
1004
1005         rbd_coll_end_req(req_data, rc, bytes);
1006
1007         if (req_data->bio)
1008                 bio_chain_put(req_data->bio);
1009
1010         ceph_osdc_put_request(req);
1011         kfree(req_data);
1012 }
1013
1014 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1015 {
1016         ceph_osdc_put_request(req);
1017 }
1018
1019 /*
1020  * Do a synchronous ceph osd operation
1021  */
1022 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1023                            struct ceph_snap_context *snapc,
1024                            u64 snapid,
1025                            int opcode,
1026                            int flags,
1027                            struct ceph_osd_req_op *orig_ops,
1028                            const char *object_name,
1029                            u64 ofs, u64 len,
1030                            char *buf,
1031                            struct ceph_osd_request **linger_req,
1032                            u64 *ver)
1033 {
1034         int ret;
1035         struct page **pages;
1036         int num_pages;
1037         struct ceph_osd_req_op *ops = orig_ops;
1038         u32 payload_len;
1039
1040         num_pages = calc_pages_for(ofs , len);
1041         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1042         if (IS_ERR(pages))
1043                 return PTR_ERR(pages);
1044
1045         if (!orig_ops) {
1046                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1047                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1048                 if (ret < 0)
1049                         goto done;
1050
1051                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1052                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1053                         if (ret < 0)
1054                                 goto done_ops;
1055                 }
1056         }
1057
1058         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1059                           object_name, ofs, len, NULL,
1060                           pages, num_pages,
1061                           flags,
1062                           ops,
1063                           NULL, 0,
1064                           NULL,
1065                           linger_req, ver);
1066         if (ret < 0)
1067                 goto done_ops;
1068
1069         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1070                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1071
1072 done_ops:
1073         if (!orig_ops)
1074                 rbd_destroy_ops(ops);
1075 done:
1076         ceph_release_page_vector(pages, num_pages);
1077         return ret;
1078 }
1079
1080 /*
1081  * Do an asynchronous ceph osd operation
1082  */
1083 static int rbd_do_op(struct request *rq,
1084                      struct rbd_device *rbd_dev,
1085                      struct ceph_snap_context *snapc,
1086                      u64 snapid,
1087                      int opcode, int flags,
1088                      u64 ofs, u64 len,
1089                      struct bio *bio,
1090                      struct rbd_req_coll *coll,
1091                      int coll_index)
1092 {
1093         char *seg_name;
1094         u64 seg_ofs;
1095         u64 seg_len;
1096         int ret;
1097         struct ceph_osd_req_op *ops;
1098         u32 payload_len;
1099
1100         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1101         if (!seg_name)
1102                 return -ENOMEM;
1103
1104         seg_len = rbd_get_segment(&rbd_dev->header,
1105                                   rbd_dev->header.object_prefix,
1106                                   ofs, len,
1107                                   seg_name, &seg_ofs);
1108
1109         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1110
1111         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1112         if (ret < 0)
1113                 goto done;
1114
1115         /* we've taken care of segment sizes earlier when we
1116            cloned the bios. We should never have a segment
1117            truncated at this point */
1118         BUG_ON(seg_len < len);
1119
1120         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1121                              seg_name, seg_ofs, seg_len,
1122                              bio,
1123                              NULL, 0,
1124                              flags,
1125                              ops,
1126                              coll, coll_index,
1127                              rbd_req_cb, 0, NULL);
1128
1129         rbd_destroy_ops(ops);
1130 done:
1131         kfree(seg_name);
1132         return ret;
1133 }
1134
1135 /*
1136  * Request async osd write
1137  */
1138 static int rbd_req_write(struct request *rq,
1139                          struct rbd_device *rbd_dev,
1140                          struct ceph_snap_context *snapc,
1141                          u64 ofs, u64 len,
1142                          struct bio *bio,
1143                          struct rbd_req_coll *coll,
1144                          int coll_index)
1145 {
1146         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1147                          CEPH_OSD_OP_WRITE,
1148                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1149                          ofs, len, bio, coll, coll_index);
1150 }
1151
1152 /*
1153  * Request async osd read
1154  */
1155 static int rbd_req_read(struct request *rq,
1156                          struct rbd_device *rbd_dev,
1157                          u64 snapid,
1158                          u64 ofs, u64 len,
1159                          struct bio *bio,
1160                          struct rbd_req_coll *coll,
1161                          int coll_index)
1162 {
1163         return rbd_do_op(rq, rbd_dev, NULL,
1164                          snapid,
1165                          CEPH_OSD_OP_READ,
1166                          CEPH_OSD_FLAG_READ,
1167                          ofs, len, bio, coll, coll_index);
1168 }
1169
1170 /*
1171  * Request sync osd read
1172  */
1173 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1174                           struct ceph_snap_context *snapc,
1175                           u64 snapid,
1176                           const char *object_name,
1177                           u64 ofs, u64 len,
1178                           char *buf,
1179                           u64 *ver)
1180 {
1181         return rbd_req_sync_op(rbd_dev, NULL,
1182                                snapid,
1183                                CEPH_OSD_OP_READ,
1184                                CEPH_OSD_FLAG_READ,
1185                                NULL,
1186                                object_name, ofs, len, buf, NULL, ver);
1187 }
1188
1189 /*
1190  * Request sync osd watch
1191  */
1192 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1193                                    u64 ver,
1194                                    u64 notify_id,
1195                                    const char *object_name)
1196 {
1197         struct ceph_osd_req_op *ops;
1198         int ret;
1199
1200         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1201         if (ret < 0)
1202                 return ret;
1203
1204         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1205         ops[0].watch.cookie = notify_id;
1206         ops[0].watch.flag = 0;
1207
1208         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1209                           object_name, 0, 0, NULL,
1210                           NULL, 0,
1211                           CEPH_OSD_FLAG_READ,
1212                           ops,
1213                           NULL, 0,
1214                           rbd_simple_req_cb, 0, NULL);
1215
1216         rbd_destroy_ops(ops);
1217         return ret;
1218 }
1219
1220 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1221 {
1222         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1223         int rc;
1224
1225         if (!rbd_dev)
1226                 return;
1227
1228         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1229                 rbd_dev->header_name, notify_id, (int) opcode);
1230         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1231         rc = __rbd_refresh_header(rbd_dev);
1232         mutex_unlock(&ctl_mutex);
1233         if (rc)
1234                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1235                            " update snaps: %d\n", rbd_dev->major, rc);
1236
1237         rbd_req_sync_notify_ack(rbd_dev, ver, notify_id, rbd_dev->header_name);
1238 }
1239
1240 /*
1241  * Request sync osd watch
1242  */
1243 static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
1244                               const char *object_name,
1245                               u64 ver)
1246 {
1247         struct ceph_osd_req_op *ops;
1248         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1249
1250         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1251         if (ret < 0)
1252                 return ret;
1253
1254         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1255                                      (void *)rbd_dev, &rbd_dev->watch_event);
1256         if (ret < 0)
1257                 goto fail;
1258
1259         ops[0].watch.ver = cpu_to_le64(ver);
1260         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1261         ops[0].watch.flag = 1;
1262
1263         ret = rbd_req_sync_op(rbd_dev, NULL,
1264                               CEPH_NOSNAP,
1265                               0,
1266                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1267                               ops,
1268                               object_name, 0, 0, NULL,
1269                               &rbd_dev->watch_request, NULL);
1270
1271         if (ret < 0)
1272                 goto fail_event;
1273
1274         rbd_destroy_ops(ops);
1275         return 0;
1276
1277 fail_event:
1278         ceph_osdc_cancel_event(rbd_dev->watch_event);
1279         rbd_dev->watch_event = NULL;
1280 fail:
1281         rbd_destroy_ops(ops);
1282         return ret;
1283 }
1284
1285 /*
1286  * Request sync osd unwatch
1287  */
1288 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1289                                 const char *object_name)
1290 {
1291         struct ceph_osd_req_op *ops;
1292
1293         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1294         if (ret < 0)
1295                 return ret;
1296
1297         ops[0].watch.ver = 0;
1298         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1299         ops[0].watch.flag = 0;
1300
1301         ret = rbd_req_sync_op(rbd_dev, NULL,
1302                               CEPH_NOSNAP,
1303                               0,
1304                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1305                               ops,
1306                               object_name, 0, 0, NULL, NULL, NULL);
1307
1308         rbd_destroy_ops(ops);
1309         ceph_osdc_cancel_event(rbd_dev->watch_event);
1310         rbd_dev->watch_event = NULL;
1311         return ret;
1312 }
1313
1314 struct rbd_notify_info {
1315         struct rbd_device *rbd_dev;
1316 };
1317
1318 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1319 {
1320         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1321         if (!rbd_dev)
1322                 return;
1323
1324         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
1325                                 rbd_dev->header_name,
1326                 notify_id, (int)opcode);
1327 }
1328
1329 /*
1330  * Request sync osd notify
1331  */
1332 static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
1333                           const char *object_name)
1334 {
1335         struct ceph_osd_req_op *ops;
1336         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1337         struct ceph_osd_event *event;
1338         struct rbd_notify_info info;
1339         int payload_len = sizeof(u32) + sizeof(u32);
1340         int ret;
1341
1342         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1343         if (ret < 0)
1344                 return ret;
1345
1346         info.rbd_dev = rbd_dev;
1347
1348         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1349                                      (void *)&info, &event);
1350         if (ret < 0)
1351                 goto fail;
1352
1353         ops[0].watch.ver = 1;
1354         ops[0].watch.flag = 1;
1355         ops[0].watch.cookie = event->cookie;
1356         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1357         ops[0].watch.timeout = 12;
1358
1359         ret = rbd_req_sync_op(rbd_dev, NULL,
1360                                CEPH_NOSNAP,
1361                                0,
1362                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1363                                ops,
1364                                object_name, 0, 0, NULL, NULL, NULL);
1365         if (ret < 0)
1366                 goto fail_event;
1367
1368         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1369         dout("ceph_osdc_wait_event returned %d\n", ret);
1370         rbd_destroy_ops(ops);
1371         return 0;
1372
1373 fail_event:
1374         ceph_osdc_cancel_event(event);
1375 fail:
1376         rbd_destroy_ops(ops);
1377         return ret;
1378 }
1379
1380 /*
1381  * Request sync osd read
1382  */
1383 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1384                              const char *object_name,
1385                              const char *class_name,
1386                              const char *method_name,
1387                              const char *data,
1388                              int len,
1389                              u64 *ver)
1390 {
1391         struct ceph_osd_req_op *ops;
1392         int class_name_len = strlen(class_name);
1393         int method_name_len = strlen(method_name);
1394         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1395                                     class_name_len + method_name_len + len);
1396         if (ret < 0)
1397                 return ret;
1398
1399         ops[0].cls.class_name = class_name;
1400         ops[0].cls.class_len = (__u8) class_name_len;
1401         ops[0].cls.method_name = method_name;
1402         ops[0].cls.method_len = (__u8) method_name_len;
1403         ops[0].cls.argc = 0;
1404         ops[0].cls.indata = data;
1405         ops[0].cls.indata_len = len;
1406
1407         ret = rbd_req_sync_op(rbd_dev, NULL,
1408                                CEPH_NOSNAP,
1409                                0,
1410                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1411                                ops,
1412                                object_name, 0, 0, NULL, NULL, ver);
1413
1414         rbd_destroy_ops(ops);
1415
1416         dout("cls_exec returned %d\n", ret);
1417         return ret;
1418 }
1419
1420 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1421 {
1422         struct rbd_req_coll *coll =
1423                         kzalloc(sizeof(struct rbd_req_coll) +
1424                                 sizeof(struct rbd_req_status) * num_reqs,
1425                                 GFP_ATOMIC);
1426
1427         if (!coll)
1428                 return NULL;
1429         coll->total = num_reqs;
1430         kref_init(&coll->kref);
1431         return coll;
1432 }
1433
1434 /*
1435  * block device queue callback
1436  */
1437 static void rbd_rq_fn(struct request_queue *q)
1438 {
1439         struct rbd_device *rbd_dev = q->queuedata;
1440         struct request *rq;
1441         struct bio_pair *bp = NULL;
1442
1443         while ((rq = blk_fetch_request(q))) {
1444                 struct bio *bio;
1445                 struct bio *rq_bio, *next_bio = NULL;
1446                 bool do_write;
1447                 int size, op_size = 0;
1448                 u64 ofs;
1449                 int num_segs, cur_seg = 0;
1450                 struct rbd_req_coll *coll;
1451
1452                 /* peek at request from block layer */
1453                 if (!rq)
1454                         break;
1455
1456                 dout("fetched request\n");
1457
1458                 /* filter out block requests we don't understand */
1459                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1460                         __blk_end_request_all(rq, 0);
1461                         continue;
1462                 }
1463
1464                 /* deduce our operation (read, write) */
1465                 do_write = (rq_data_dir(rq) == WRITE);
1466
1467                 size = blk_rq_bytes(rq);
1468                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1469                 rq_bio = rq->bio;
1470                 if (do_write && rbd_dev->read_only) {
1471                         __blk_end_request_all(rq, -EROFS);
1472                         continue;
1473                 }
1474
1475                 spin_unlock_irq(q->queue_lock);
1476
1477                 if (rbd_dev->snap_id != CEPH_NOSNAP) {
1478                         bool snap_exists;
1479
1480                         down_read(&rbd_dev->header_rwsem);
1481                         snap_exists = rbd_dev->snap_exists;
1482                         up_read(&rbd_dev->header_rwsem);
1483
1484                         if (!snap_exists) {
1485                                 dout("request for non-existent snapshot");
1486                                 spin_lock_irq(q->queue_lock);
1487                                 __blk_end_request_all(rq, -ENXIO);
1488                                 continue;
1489                         }
1490                 }
1491
1492                 dout("%s 0x%x bytes at 0x%llx\n",
1493                      do_write ? "write" : "read",
1494                      size, blk_rq_pos(rq) * SECTOR_SIZE);
1495
1496                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1497                 coll = rbd_alloc_coll(num_segs);
1498                 if (!coll) {
1499                         spin_lock_irq(q->queue_lock);
1500                         __blk_end_request_all(rq, -ENOMEM);
1501                         continue;
1502                 }
1503
1504                 do {
1505                         /* a bio clone to be passed down to OSD req */
1506                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1507                         op_size = rbd_get_segment(&rbd_dev->header,
1508                                                   rbd_dev->header.object_prefix,
1509                                                   ofs, size,
1510                                                   NULL, NULL);
1511                         kref_get(&coll->kref);
1512                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1513                                               op_size, GFP_ATOMIC);
1514                         if (!bio) {
1515                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1516                                                        -ENOMEM, op_size);
1517                                 goto next_seg;
1518                         }
1519
1520
1521                         /* init OSD command: write or read */
1522                         if (do_write)
1523                                 rbd_req_write(rq, rbd_dev,
1524                                               rbd_dev->header.snapc,
1525                                               ofs,
1526                                               op_size, bio,
1527                                               coll, cur_seg);
1528                         else
1529                                 rbd_req_read(rq, rbd_dev,
1530                                              rbd_dev->snap_id,
1531                                              ofs,
1532                                              op_size, bio,
1533                                              coll, cur_seg);
1534
1535 next_seg:
1536                         size -= op_size;
1537                         ofs += op_size;
1538
1539                         cur_seg++;
1540                         rq_bio = next_bio;
1541                 } while (size > 0);
1542                 kref_put(&coll->kref, rbd_coll_release);
1543
1544                 if (bp)
1545                         bio_pair_release(bp);
1546                 spin_lock_irq(q->queue_lock);
1547         }
1548 }
1549
1550 /*
1551  * a queue callback. Makes sure that we don't create a bio that spans across
1552  * multiple osd objects. One exception would be with a single page bios,
1553  * which we handle later at bio_chain_clone
1554  */
1555 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1556                           struct bio_vec *bvec)
1557 {
1558         struct rbd_device *rbd_dev = q->queuedata;
1559         unsigned int chunk_sectors;
1560         sector_t sector;
1561         unsigned int bio_sectors;
1562         int max;
1563
1564         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1565         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1566         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1567
1568         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1569                                  + bio_sectors)) << SECTOR_SHIFT;
1570         if (max < 0)
1571                 max = 0; /* bio_add cannot handle a negative return */
1572         if (max <= bvec->bv_len && bio_sectors == 0)
1573                 return bvec->bv_len;
1574         return max;
1575 }
1576
1577 static void rbd_free_disk(struct rbd_device *rbd_dev)
1578 {
1579         struct gendisk *disk = rbd_dev->disk;
1580
1581         if (!disk)
1582                 return;
1583
1584         rbd_header_free(&rbd_dev->header);
1585
1586         if (disk->flags & GENHD_FL_UP)
1587                 del_gendisk(disk);
1588         if (disk->queue)
1589                 blk_cleanup_queue(disk->queue);
1590         put_disk(disk);
1591 }
1592
1593 /*
1594  * reload the ondisk the header 
1595  */
1596 static int rbd_read_header(struct rbd_device *rbd_dev,
1597                            struct rbd_image_header *header)
1598 {
1599         ssize_t rc;
1600         struct rbd_image_header_ondisk *dh;
1601         u32 snap_count = 0;
1602         u64 ver;
1603         size_t len;
1604
1605         /*
1606          * First reads the fixed-size header to determine the number
1607          * of snapshots, then re-reads it, along with all snapshot
1608          * records as well as their stored names.
1609          */
1610         len = sizeof (*dh);
1611         while (1) {
1612                 dh = kmalloc(len, GFP_KERNEL);
1613                 if (!dh)
1614                         return -ENOMEM;
1615
1616                 rc = rbd_req_sync_read(rbd_dev,
1617                                        NULL, CEPH_NOSNAP,
1618                                        rbd_dev->header_name,
1619                                        0, len,
1620                                        (char *)dh, &ver);
1621                 if (rc < 0)
1622                         goto out_dh;
1623
1624                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1625                 if (rc < 0) {
1626                         if (rc == -ENXIO)
1627                                 pr_warning("unrecognized header format"
1628                                            " for image %s\n",
1629                                            rbd_dev->image_name);
1630                         goto out_dh;
1631                 }
1632
1633                 if (snap_count == header->total_snaps)
1634                         break;
1635
1636                 snap_count = header->total_snaps;
1637                 len = sizeof (*dh) +
1638                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1639                         header->snap_names_len;
1640
1641                 rbd_header_free(header);
1642                 kfree(dh);
1643         }
1644         header->obj_version = ver;
1645
1646 out_dh:
1647         kfree(dh);
1648         return rc;
1649 }
1650
1651 /*
1652  * create a snapshot
1653  */
1654 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1655                                const char *snap_name,
1656                                gfp_t gfp_flags)
1657 {
1658         int name_len = strlen(snap_name);
1659         u64 new_snapid;
1660         int ret;
1661         void *data, *p, *e;
1662         u64 ver;
1663         struct ceph_mon_client *monc;
1664
1665         /* we should create a snapshot only if we're pointing at the head */
1666         if (rbd_dev->snap_id != CEPH_NOSNAP)
1667                 return -EINVAL;
1668
1669         monc = &rbd_dev->rbd_client->client->monc;
1670         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1671         dout("created snapid=%lld\n", new_snapid);
1672         if (ret < 0)
1673                 return ret;
1674
1675         data = kmalloc(name_len + 16, gfp_flags);
1676         if (!data)
1677                 return -ENOMEM;
1678
1679         p = data;
1680         e = data + name_len + 16;
1681
1682         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1683         ceph_encode_64_safe(&p, e, new_snapid, bad);
1684
1685         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1686                                 "rbd", "snap_add",
1687                                 data, p - data, &ver);
1688
1689         kfree(data);
1690
1691         if (ret < 0)
1692                 return ret;
1693
1694         down_write(&rbd_dev->header_rwsem);
1695         rbd_dev->header.snapc->seq = new_snapid;
1696         up_write(&rbd_dev->header_rwsem);
1697
1698         return 0;
1699 bad:
1700         return -ERANGE;
1701 }
1702
1703 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1704 {
1705         struct rbd_snap *snap;
1706
1707         while (!list_empty(&rbd_dev->snaps)) {
1708                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1709                 __rbd_remove_snap_dev(rbd_dev, snap);
1710         }
1711 }
1712
1713 /*
1714  * only read the first part of the ondisk header, without the snaps info
1715  */
1716 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1717 {
1718         int ret;
1719         struct rbd_image_header h;
1720         u64 snap_seq;
1721         int follow_seq = 0;
1722
1723         ret = rbd_read_header(rbd_dev, &h);
1724         if (ret < 0)
1725                 return ret;
1726
1727         down_write(&rbd_dev->header_rwsem);
1728
1729         /* resized? */
1730         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1731                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1732
1733                 dout("setting size to %llu sectors", (unsigned long long) size);
1734                 set_capacity(rbd_dev->disk, size);
1735         }
1736
1737         snap_seq = rbd_dev->header.snapc->seq;
1738         if (rbd_dev->header.total_snaps &&
1739             rbd_dev->header.snapc->snaps[0] == snap_seq)
1740                 /* pointing at the head, will need to follow that
1741                    if head moves */
1742                 follow_seq = 1;
1743
1744         /* rbd_dev->header.object_prefix shouldn't change */
1745         kfree(rbd_dev->header.snap_sizes);
1746         kfree(rbd_dev->header.snap_names);
1747         kfree(rbd_dev->header.snapc);
1748
1749         rbd_dev->header.total_snaps = h.total_snaps;
1750         rbd_dev->header.snapc = h.snapc;
1751         rbd_dev->header.snap_names = h.snap_names;
1752         rbd_dev->header.snap_names_len = h.snap_names_len;
1753         rbd_dev->header.snap_sizes = h.snap_sizes;
1754         /* Free the extra copy of the object prefix */
1755         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1756         kfree(h.object_prefix);
1757
1758         if (follow_seq)
1759                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1760         else
1761                 rbd_dev->header.snapc->seq = snap_seq;
1762
1763         ret = __rbd_init_snaps_header(rbd_dev);
1764
1765         up_write(&rbd_dev->header_rwsem);
1766
1767         return ret;
1768 }
1769
1770 static int rbd_init_disk(struct rbd_device *rbd_dev)
1771 {
1772         struct gendisk *disk;
1773         struct request_queue *q;
1774         int rc;
1775         u64 segment_size;
1776         u64 total_size = 0;
1777
1778         /* contact OSD, request size info about the object being mapped */
1779         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1780         if (rc)
1781                 return rc;
1782
1783         /* no need to lock here, as rbd_dev is not registered yet */
1784         rc = __rbd_init_snaps_header(rbd_dev);
1785         if (rc)
1786                 return rc;
1787
1788         rc = rbd_header_set_snap(rbd_dev, &total_size);
1789         if (rc)
1790                 return rc;
1791
1792         /* create gendisk info */
1793         rc = -ENOMEM;
1794         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1795         if (!disk)
1796                 goto out;
1797
1798         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1799                  rbd_dev->id);
1800         disk->major = rbd_dev->major;
1801         disk->first_minor = 0;
1802         disk->fops = &rbd_bd_ops;
1803         disk->private_data = rbd_dev;
1804
1805         /* init rq */
1806         rc = -ENOMEM;
1807         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1808         if (!q)
1809                 goto out_disk;
1810
1811         /* We use the default size, but let's be explicit about it. */
1812         blk_queue_physical_block_size(q, SECTOR_SIZE);
1813
1814         /* set io sizes to object size */
1815         segment_size = rbd_obj_bytes(&rbd_dev->header);
1816         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1817         blk_queue_max_segment_size(q, segment_size);
1818         blk_queue_io_min(q, segment_size);
1819         blk_queue_io_opt(q, segment_size);
1820
1821         blk_queue_merge_bvec(q, rbd_merge_bvec);
1822         disk->queue = q;
1823
1824         q->queuedata = rbd_dev;
1825
1826         rbd_dev->disk = disk;
1827         rbd_dev->q = q;
1828
1829         /* finally, announce the disk to the world */
1830         set_capacity(disk, total_size / SECTOR_SIZE);
1831         add_disk(disk);
1832
1833         pr_info("%s: added with size 0x%llx\n",
1834                 disk->disk_name, (unsigned long long)total_size);
1835         return 0;
1836
1837 out_disk:
1838         put_disk(disk);
1839 out:
1840         return rc;
1841 }
1842
1843 /*
1844   sysfs
1845 */
1846
1847 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1848 {
1849         return container_of(dev, struct rbd_device, dev);
1850 }
1851
1852 static ssize_t rbd_size_show(struct device *dev,
1853                              struct device_attribute *attr, char *buf)
1854 {
1855         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1856         sector_t size;
1857
1858         down_read(&rbd_dev->header_rwsem);
1859         size = get_capacity(rbd_dev->disk);
1860         up_read(&rbd_dev->header_rwsem);
1861
1862         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1863 }
1864
1865 static ssize_t rbd_major_show(struct device *dev,
1866                               struct device_attribute *attr, char *buf)
1867 {
1868         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870         return sprintf(buf, "%d\n", rbd_dev->major);
1871 }
1872
1873 static ssize_t rbd_client_id_show(struct device *dev,
1874                                   struct device_attribute *attr, char *buf)
1875 {
1876         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1877
1878         return sprintf(buf, "client%lld\n",
1879                         ceph_client_id(rbd_dev->rbd_client->client));
1880 }
1881
1882 static ssize_t rbd_pool_show(struct device *dev,
1883                              struct device_attribute *attr, char *buf)
1884 {
1885         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1886
1887         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1888 }
1889
1890 static ssize_t rbd_pool_id_show(struct device *dev,
1891                              struct device_attribute *attr, char *buf)
1892 {
1893         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1894
1895         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1896 }
1897
1898 static ssize_t rbd_name_show(struct device *dev,
1899                              struct device_attribute *attr, char *buf)
1900 {
1901         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1902
1903         return sprintf(buf, "%s\n", rbd_dev->image_name);
1904 }
1905
1906 static ssize_t rbd_snap_show(struct device *dev,
1907                              struct device_attribute *attr,
1908                              char *buf)
1909 {
1910         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1911
1912         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1913 }
1914
1915 static ssize_t rbd_image_refresh(struct device *dev,
1916                                  struct device_attribute *attr,
1917                                  const char *buf,
1918                                  size_t size)
1919 {
1920         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1921         int rc;
1922         int ret = size;
1923
1924         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1925
1926         rc = __rbd_refresh_header(rbd_dev);
1927         if (rc < 0)
1928                 ret = rc;
1929
1930         mutex_unlock(&ctl_mutex);
1931         return ret;
1932 }
1933
1934 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1935 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1936 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1937 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1938 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1939 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1940 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1941 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1942 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1943
1944 static struct attribute *rbd_attrs[] = {
1945         &dev_attr_size.attr,
1946         &dev_attr_major.attr,
1947         &dev_attr_client_id.attr,
1948         &dev_attr_pool.attr,
1949         &dev_attr_pool_id.attr,
1950         &dev_attr_name.attr,
1951         &dev_attr_current_snap.attr,
1952         &dev_attr_refresh.attr,
1953         &dev_attr_create_snap.attr,
1954         NULL
1955 };
1956
1957 static struct attribute_group rbd_attr_group = {
1958         .attrs = rbd_attrs,
1959 };
1960
1961 static const struct attribute_group *rbd_attr_groups[] = {
1962         &rbd_attr_group,
1963         NULL
1964 };
1965
1966 static void rbd_sysfs_dev_release(struct device *dev)
1967 {
1968 }
1969
1970 static struct device_type rbd_device_type = {
1971         .name           = "rbd",
1972         .groups         = rbd_attr_groups,
1973         .release        = rbd_sysfs_dev_release,
1974 };
1975
1976
1977 /*
1978   sysfs - snapshots
1979 */
1980
1981 static ssize_t rbd_snap_size_show(struct device *dev,
1982                                   struct device_attribute *attr,
1983                                   char *buf)
1984 {
1985         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1986
1987         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1988 }
1989
1990 static ssize_t rbd_snap_id_show(struct device *dev,
1991                                 struct device_attribute *attr,
1992                                 char *buf)
1993 {
1994         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1995
1996         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1997 }
1998
1999 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2000 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2001
2002 static struct attribute *rbd_snap_attrs[] = {
2003         &dev_attr_snap_size.attr,
2004         &dev_attr_snap_id.attr,
2005         NULL,
2006 };
2007
2008 static struct attribute_group rbd_snap_attr_group = {
2009         .attrs = rbd_snap_attrs,
2010 };
2011
2012 static void rbd_snap_dev_release(struct device *dev)
2013 {
2014         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2015         kfree(snap->name);
2016         kfree(snap);
2017 }
2018
2019 static const struct attribute_group *rbd_snap_attr_groups[] = {
2020         &rbd_snap_attr_group,
2021         NULL
2022 };
2023
2024 static struct device_type rbd_snap_device_type = {
2025         .groups         = rbd_snap_attr_groups,
2026         .release        = rbd_snap_dev_release,
2027 };
2028
2029 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2030                                   struct rbd_snap *snap)
2031 {
2032         list_del(&snap->node);
2033         device_unregister(&snap->dev);
2034 }
2035
2036 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2037                                   struct rbd_snap *snap,
2038                                   struct device *parent)
2039 {
2040         struct device *dev = &snap->dev;
2041         int ret;
2042
2043         dev->type = &rbd_snap_device_type;
2044         dev->parent = parent;
2045         dev->release = rbd_snap_dev_release;
2046         dev_set_name(dev, "snap_%s", snap->name);
2047         ret = device_register(dev);
2048
2049         return ret;
2050 }
2051
2052 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2053                               int i, const char *name,
2054                               struct rbd_snap **snapp)
2055 {
2056         int ret;
2057         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2058         if (!snap)
2059                 return -ENOMEM;
2060         snap->name = kstrdup(name, GFP_KERNEL);
2061         snap->size = rbd_dev->header.snap_sizes[i];
2062         snap->id = rbd_dev->header.snapc->snaps[i];
2063         if (device_is_registered(&rbd_dev->dev)) {
2064                 ret = rbd_register_snap_dev(rbd_dev, snap,
2065                                              &rbd_dev->dev);
2066                 if (ret < 0)
2067                         goto err;
2068         }
2069         *snapp = snap;
2070         return 0;
2071 err:
2072         kfree(snap->name);
2073         kfree(snap);
2074         return ret;
2075 }
2076
2077 /*
2078  * search for the previous snap in a null delimited string list
2079  */
2080 const char *rbd_prev_snap_name(const char *name, const char *start)
2081 {
2082         if (name < start + 2)
2083                 return NULL;
2084
2085         name -= 2;
2086         while (*name) {
2087                 if (name == start)
2088                         return start;
2089                 name--;
2090         }
2091         return name + 1;
2092 }
2093
2094 /*
2095  * compare the old list of snapshots that we have to what's in the header
2096  * and update it accordingly. Note that the header holds the snapshots
2097  * in a reverse order (from newest to oldest) and we need to go from
2098  * older to new so that we don't get a duplicate snap name when
2099  * doing the process (e.g., removed snapshot and recreated a new
2100  * one with the same name.
2101  */
2102 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2103 {
2104         const char *name, *first_name;
2105         int i = rbd_dev->header.total_snaps;
2106         struct rbd_snap *snap, *old_snap = NULL;
2107         int ret;
2108         struct list_head *p, *n;
2109
2110         first_name = rbd_dev->header.snap_names;
2111         name = first_name + rbd_dev->header.snap_names_len;
2112
2113         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2114                 u64 cur_id;
2115
2116                 old_snap = list_entry(p, struct rbd_snap, node);
2117
2118                 if (i)
2119                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2120
2121                 if (!i || old_snap->id < cur_id) {
2122                         /*
2123                          * old_snap->id was skipped, thus was
2124                          * removed.  If this rbd_dev is mapped to
2125                          * the removed snapshot, record that it no
2126                          * longer exists, to prevent further I/O.
2127                          */
2128                         if (rbd_dev->snap_id == old_snap->id)
2129                                 rbd_dev->snap_exists = false;
2130                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2131                         continue;
2132                 }
2133                 if (old_snap->id == cur_id) {
2134                         /* we have this snapshot already */
2135                         i--;
2136                         name = rbd_prev_snap_name(name, first_name);
2137                         continue;
2138                 }
2139                 for (; i > 0;
2140                      i--, name = rbd_prev_snap_name(name, first_name)) {
2141                         if (!name) {
2142                                 WARN_ON(1);
2143                                 return -EINVAL;
2144                         }
2145                         cur_id = rbd_dev->header.snapc->snaps[i];
2146                         /* snapshot removal? handle it above */
2147                         if (cur_id >= old_snap->id)
2148                                 break;
2149                         /* a new snapshot */
2150                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2151                         if (ret < 0)
2152                                 return ret;
2153
2154                         /* note that we add it backward so using n and not p */
2155                         list_add(&snap->node, n);
2156                         p = &snap->node;
2157                 }
2158         }
2159         /* we're done going over the old snap list, just add what's left */
2160         for (; i > 0; i--) {
2161                 name = rbd_prev_snap_name(name, first_name);
2162                 if (!name) {
2163                         WARN_ON(1);
2164                         return -EINVAL;
2165                 }
2166                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2167                 if (ret < 0)
2168                         return ret;
2169                 list_add(&snap->node, &rbd_dev->snaps);
2170         }
2171
2172         return 0;
2173 }
2174
2175 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2176 {
2177         int ret;
2178         struct device *dev;
2179         struct rbd_snap *snap;
2180
2181         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2182         dev = &rbd_dev->dev;
2183
2184         dev->bus = &rbd_bus_type;
2185         dev->type = &rbd_device_type;
2186         dev->parent = &rbd_root_dev;
2187         dev->release = rbd_dev_release;
2188         dev_set_name(dev, "%d", rbd_dev->id);
2189         ret = device_register(dev);
2190         if (ret < 0)
2191                 goto out;
2192
2193         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2194                 ret = rbd_register_snap_dev(rbd_dev, snap,
2195                                              &rbd_dev->dev);
2196                 if (ret < 0)
2197                         break;
2198         }
2199 out:
2200         mutex_unlock(&ctl_mutex);
2201         return ret;
2202 }
2203
2204 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2205 {
2206         device_unregister(&rbd_dev->dev);
2207 }
2208
2209 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2210 {
2211         int ret, rc;
2212
2213         do {
2214                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
2215                                          rbd_dev->header.obj_version);
2216                 if (ret == -ERANGE) {
2217                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2218                         rc = __rbd_refresh_header(rbd_dev);
2219                         mutex_unlock(&ctl_mutex);
2220                         if (rc < 0)
2221                                 return rc;
2222                 }
2223         } while (ret == -ERANGE);
2224
2225         return ret;
2226 }
2227
2228 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2229
2230 /*
2231  * Get a unique rbd identifier for the given new rbd_dev, and add
2232  * the rbd_dev to the global list.  The minimum rbd id is 1.
2233  */
2234 static void rbd_id_get(struct rbd_device *rbd_dev)
2235 {
2236         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2237
2238         spin_lock(&rbd_dev_list_lock);
2239         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2240         spin_unlock(&rbd_dev_list_lock);
2241 }
2242
2243 /*
2244  * Remove an rbd_dev from the global list, and record that its
2245  * identifier is no longer in use.
2246  */
2247 static void rbd_id_put(struct rbd_device *rbd_dev)
2248 {
2249         struct list_head *tmp;
2250         int rbd_id = rbd_dev->id;
2251         int max_id;
2252
2253         BUG_ON(rbd_id < 1);
2254
2255         spin_lock(&rbd_dev_list_lock);
2256         list_del_init(&rbd_dev->node);
2257
2258         /*
2259          * If the id being "put" is not the current maximum, there
2260          * is nothing special we need to do.
2261          */
2262         if (rbd_id != atomic64_read(&rbd_id_max)) {
2263                 spin_unlock(&rbd_dev_list_lock);
2264                 return;
2265         }
2266
2267         /*
2268          * We need to update the current maximum id.  Search the
2269          * list to find out what it is.  We're more likely to find
2270          * the maximum at the end, so search the list backward.
2271          */
2272         max_id = 0;
2273         list_for_each_prev(tmp, &rbd_dev_list) {
2274                 struct rbd_device *rbd_dev;
2275
2276                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2277                 if (rbd_id > max_id)
2278                         max_id = rbd_id;
2279         }
2280         spin_unlock(&rbd_dev_list_lock);
2281
2282         /*
2283          * The max id could have been updated by rbd_id_get(), in
2284          * which case it now accurately reflects the new maximum.
2285          * Be careful not to overwrite the maximum value in that
2286          * case.
2287          */
2288         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2289 }
2290
2291 /*
2292  * Skips over white space at *buf, and updates *buf to point to the
2293  * first found non-space character (if any). Returns the length of
2294  * the token (string of non-white space characters) found.  Note
2295  * that *buf must be terminated with '\0'.
2296  */
2297 static inline size_t next_token(const char **buf)
2298 {
2299         /*
2300         * These are the characters that produce nonzero for
2301         * isspace() in the "C" and "POSIX" locales.
2302         */
2303         const char *spaces = " \f\n\r\t\v";
2304
2305         *buf += strspn(*buf, spaces);   /* Find start of token */
2306
2307         return strcspn(*buf, spaces);   /* Return token length */
2308 }
2309
2310 /*
2311  * Finds the next token in *buf, and if the provided token buffer is
2312  * big enough, copies the found token into it.  The result, if
2313  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2314  * must be terminated with '\0' on entry.
2315  *
2316  * Returns the length of the token found (not including the '\0').
2317  * Return value will be 0 if no token is found, and it will be >=
2318  * token_size if the token would not fit.
2319  *
2320  * The *buf pointer will be updated to point beyond the end of the
2321  * found token.  Note that this occurs even if the token buffer is
2322  * too small to hold it.
2323  */
2324 static inline size_t copy_token(const char **buf,
2325                                 char *token,
2326                                 size_t token_size)
2327 {
2328         size_t len;
2329
2330         len = next_token(buf);
2331         if (len < token_size) {
2332                 memcpy(token, *buf, len);
2333                 *(token + len) = '\0';
2334         }
2335         *buf += len;
2336
2337         return len;
2338 }
2339
2340 /*
2341  * Finds the next token in *buf, dynamically allocates a buffer big
2342  * enough to hold a copy of it, and copies the token into the new
2343  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2344  * that a duplicate buffer is created even for a zero-length token.
2345  *
2346  * Returns a pointer to the newly-allocated duplicate, or a null
2347  * pointer if memory for the duplicate was not available.  If
2348  * the lenp argument is a non-null pointer, the length of the token
2349  * (not including the '\0') is returned in *lenp.
2350  *
2351  * If successful, the *buf pointer will be updated to point beyond
2352  * the end of the found token.
2353  *
2354  * Note: uses GFP_KERNEL for allocation.
2355  */
2356 static inline char *dup_token(const char **buf, size_t *lenp)
2357 {
2358         char *dup;
2359         size_t len;
2360
2361         len = next_token(buf);
2362         dup = kmalloc(len + 1, GFP_KERNEL);
2363         if (!dup)
2364                 return NULL;
2365
2366         memcpy(dup, *buf, len);
2367         *(dup + len) = '\0';
2368         *buf += len;
2369
2370         if (lenp)
2371                 *lenp = len;
2372
2373         return dup;
2374 }
2375
2376 /*
2377  * This fills in the pool_name, image_name, image_name_len, snap_name,
2378  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2379  * on the list of monitor addresses and other options provided via
2380  * /sys/bus/rbd/add.
2381  *
2382  * Note: rbd_dev is assumed to have been initially zero-filled.
2383  */
2384 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2385                               const char *buf,
2386                               const char **mon_addrs,
2387                               size_t *mon_addrs_size,
2388                               char *options,
2389                              size_t options_size)
2390 {
2391         size_t len;
2392         int ret;
2393
2394         /* The first four tokens are required */
2395
2396         len = next_token(&buf);
2397         if (!len)
2398                 return -EINVAL;
2399         *mon_addrs_size = len + 1;
2400         *mon_addrs = buf;
2401
2402         buf += len;
2403
2404         len = copy_token(&buf, options, options_size);
2405         if (!len || len >= options_size)
2406                 return -EINVAL;
2407
2408         ret = -ENOMEM;
2409         rbd_dev->pool_name = dup_token(&buf, NULL);
2410         if (!rbd_dev->pool_name)
2411                 goto out_err;
2412
2413         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2414         if (!rbd_dev->image_name)
2415                 goto out_err;
2416
2417         /* Create the name of the header object */
2418
2419         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2420                                                 + sizeof (RBD_SUFFIX),
2421                                         GFP_KERNEL);
2422         if (!rbd_dev->header_name)
2423                 goto out_err;
2424         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2425
2426         /*
2427          * The snapshot name is optional.  If none is is supplied,
2428          * we use the default value.
2429          */
2430         rbd_dev->snap_name = dup_token(&buf, &len);
2431         if (!rbd_dev->snap_name)
2432                 goto out_err;
2433         if (!len) {
2434                 /* Replace the empty name with the default */
2435                 kfree(rbd_dev->snap_name);
2436                 rbd_dev->snap_name
2437                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2438                 if (!rbd_dev->snap_name)
2439                         goto out_err;
2440
2441                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2442                         sizeof (RBD_SNAP_HEAD_NAME));
2443         }
2444
2445         return 0;
2446
2447 out_err:
2448         kfree(rbd_dev->header_name);
2449         kfree(rbd_dev->image_name);
2450         kfree(rbd_dev->pool_name);
2451         rbd_dev->pool_name = NULL;
2452
2453         return ret;
2454 }
2455
2456 static ssize_t rbd_add(struct bus_type *bus,
2457                        const char *buf,
2458                        size_t count)
2459 {
2460         char *options;
2461         struct rbd_device *rbd_dev = NULL;
2462         const char *mon_addrs = NULL;
2463         size_t mon_addrs_size = 0;
2464         struct ceph_osd_client *osdc;
2465         int rc = -ENOMEM;
2466
2467         if (!try_module_get(THIS_MODULE))
2468                 return -ENODEV;
2469
2470         options = kmalloc(count, GFP_KERNEL);
2471         if (!options)
2472                 goto err_nomem;
2473         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2474         if (!rbd_dev)
2475                 goto err_nomem;
2476
2477         /* static rbd_device initialization */
2478         spin_lock_init(&rbd_dev->lock);
2479         INIT_LIST_HEAD(&rbd_dev->node);
2480         INIT_LIST_HEAD(&rbd_dev->snaps);
2481         init_rwsem(&rbd_dev->header_rwsem);
2482
2483         init_rwsem(&rbd_dev->header_rwsem);
2484
2485         /* generate unique id: find highest unique id, add one */
2486         rbd_id_get(rbd_dev);
2487
2488         /* Fill in the device name, now that we have its id. */
2489         BUILD_BUG_ON(DEV_NAME_LEN
2490                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2491         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2492
2493         /* parse add command */
2494         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2495                                 options, count);
2496         if (rc)
2497                 goto err_put_id;
2498
2499         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2500                                                 options);
2501         if (IS_ERR(rbd_dev->rbd_client)) {
2502                 rc = PTR_ERR(rbd_dev->rbd_client);
2503                 goto err_put_id;
2504         }
2505
2506         /* pick the pool */
2507         osdc = &rbd_dev->rbd_client->client->osdc;
2508         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2509         if (rc < 0)
2510                 goto err_out_client;
2511         rbd_dev->pool_id = rc;
2512
2513         /* register our block device */
2514         rc = register_blkdev(0, rbd_dev->name);
2515         if (rc < 0)
2516                 goto err_out_client;
2517         rbd_dev->major = rc;
2518
2519         rc = rbd_bus_add_dev(rbd_dev);
2520         if (rc)
2521                 goto err_out_blkdev;
2522
2523         /*
2524          * At this point cleanup in the event of an error is the job
2525          * of the sysfs code (initiated by rbd_bus_del_dev()).
2526          *
2527          * Set up and announce blkdev mapping.
2528          */
2529         rc = rbd_init_disk(rbd_dev);
2530         if (rc)
2531                 goto err_out_bus;
2532
2533         rc = rbd_init_watch_dev(rbd_dev);
2534         if (rc)
2535                 goto err_out_bus;
2536
2537         return count;
2538
2539 err_out_bus:
2540         /* this will also clean up rest of rbd_dev stuff */
2541
2542         rbd_bus_del_dev(rbd_dev);
2543         kfree(options);
2544         return rc;
2545
2546 err_out_blkdev:
2547         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2548 err_out_client:
2549         rbd_put_client(rbd_dev);
2550 err_put_id:
2551         if (rbd_dev->pool_name) {
2552                 kfree(rbd_dev->snap_name);
2553                 kfree(rbd_dev->header_name);
2554                 kfree(rbd_dev->image_name);
2555                 kfree(rbd_dev->pool_name);
2556         }
2557         rbd_id_put(rbd_dev);
2558 err_nomem:
2559         kfree(rbd_dev);
2560         kfree(options);
2561
2562         dout("Error adding device %s\n", buf);
2563         module_put(THIS_MODULE);
2564
2565         return (ssize_t) rc;
2566 }
2567
2568 static struct rbd_device *__rbd_get_dev(unsigned long id)
2569 {
2570         struct list_head *tmp;
2571         struct rbd_device *rbd_dev;
2572
2573         spin_lock(&rbd_dev_list_lock);
2574         list_for_each(tmp, &rbd_dev_list) {
2575                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2576                 if (rbd_dev->id == id) {
2577                         spin_unlock(&rbd_dev_list_lock);
2578                         return rbd_dev;
2579                 }
2580         }
2581         spin_unlock(&rbd_dev_list_lock);
2582         return NULL;
2583 }
2584
2585 static void rbd_dev_release(struct device *dev)
2586 {
2587         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2588
2589         if (rbd_dev->watch_request) {
2590                 struct ceph_client *client = rbd_dev->rbd_client->client;
2591
2592                 ceph_osdc_unregister_linger_request(&client->osdc,
2593                                                     rbd_dev->watch_request);
2594         }
2595         if (rbd_dev->watch_event)
2596                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
2597
2598         rbd_put_client(rbd_dev);
2599
2600         /* clean up and free blkdev */
2601         rbd_free_disk(rbd_dev);
2602         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2603
2604         /* done with the id, and with the rbd_dev */
2605         kfree(rbd_dev->snap_name);
2606         kfree(rbd_dev->header_name);
2607         kfree(rbd_dev->pool_name);
2608         kfree(rbd_dev->image_name);
2609         rbd_id_put(rbd_dev);
2610         kfree(rbd_dev);
2611
2612         /* release module ref */
2613         module_put(THIS_MODULE);
2614 }
2615
2616 static ssize_t rbd_remove(struct bus_type *bus,
2617                           const char *buf,
2618                           size_t count)
2619 {
2620         struct rbd_device *rbd_dev = NULL;
2621         int target_id, rc;
2622         unsigned long ul;
2623         int ret = count;
2624
2625         rc = strict_strtoul(buf, 10, &ul);
2626         if (rc)
2627                 return rc;
2628
2629         /* convert to int; abort if we lost anything in the conversion */
2630         target_id = (int) ul;
2631         if (target_id != ul)
2632                 return -EINVAL;
2633
2634         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2635
2636         rbd_dev = __rbd_get_dev(target_id);
2637         if (!rbd_dev) {
2638                 ret = -ENOENT;
2639                 goto done;
2640         }
2641
2642         __rbd_remove_all_snaps(rbd_dev);
2643         rbd_bus_del_dev(rbd_dev);
2644
2645 done:
2646         mutex_unlock(&ctl_mutex);
2647         return ret;
2648 }
2649
2650 static ssize_t rbd_snap_add(struct device *dev,
2651                             struct device_attribute *attr,
2652                             const char *buf,
2653                             size_t count)
2654 {
2655         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2656         int ret;
2657         char *name = kmalloc(count + 1, GFP_KERNEL);
2658         if (!name)
2659                 return -ENOMEM;
2660
2661         snprintf(name, count, "%s", buf);
2662
2663         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2664
2665         ret = rbd_header_add_snap(rbd_dev,
2666                                   name, GFP_KERNEL);
2667         if (ret < 0)
2668                 goto err_unlock;
2669
2670         ret = __rbd_refresh_header(rbd_dev);
2671         if (ret < 0)
2672                 goto err_unlock;
2673
2674         /* shouldn't hold ctl_mutex when notifying.. notify might
2675            trigger a watch callback that would need to get that mutex */
2676         mutex_unlock(&ctl_mutex);
2677
2678         /* make a best effort, don't error if failed */
2679         rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
2680
2681         ret = count;
2682         kfree(name);
2683         return ret;
2684
2685 err_unlock:
2686         mutex_unlock(&ctl_mutex);
2687         kfree(name);
2688         return ret;
2689 }
2690
2691 /*
2692  * create control files in sysfs
2693  * /sys/bus/rbd/...
2694  */
2695 static int rbd_sysfs_init(void)
2696 {
2697         int ret;
2698
2699         ret = device_register(&rbd_root_dev);
2700         if (ret < 0)
2701                 return ret;
2702
2703         ret = bus_register(&rbd_bus_type);
2704         if (ret < 0)
2705                 device_unregister(&rbd_root_dev);
2706
2707         return ret;
2708 }
2709
2710 static void rbd_sysfs_cleanup(void)
2711 {
2712         bus_unregister(&rbd_bus_type);
2713         device_unregister(&rbd_root_dev);
2714 }
2715
2716 int __init rbd_init(void)
2717 {
2718         int rc;
2719
2720         rc = rbd_sysfs_init();
2721         if (rc)
2722                 return rc;
2723         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2724         return 0;
2725 }
2726
2727 void __exit rbd_exit(void)
2728 {
2729         rbd_sysfs_cleanup();
2730 }
2731
2732 module_init(rbd_init);
2733 module_exit(rbd_exit);
2734
2735 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2736 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2737 MODULE_DESCRIPTION("rados block device");
2738
2739 /* following authorship retained from original osdblk.c */
2740 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2741
2742 MODULE_LICENSE("GPL");