]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN   64
51 #define RBD_MAX_SNAP_NAME_LEN   32
52 #define RBD_MAX_OPT_LEN         1024
53
54 #define RBD_SNAP_HEAD_NAME      "-"
55
56 #define DEV_NAME_LEN            32
57
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64         u64 image_size;
65         char block_name[32];
66         __u8 obj_order;
67         __u8 crypt_type;
68         __u8 comp_type;
69         struct rw_semaphore snap_rwsem;
70         struct ceph_snap_context *snapc;
71         size_t snap_names_len;
72         u64 snap_seq;
73         u32 total_snaps;
74
75         char *snap_names;
76         u64 *snap_sizes;
77
78         u64 obj_version;
79 };
80
81 struct rbd_options {
82         int     notify_timeout;
83 };
84
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89         struct ceph_client      *client;
90         struct rbd_options      *rbd_opts;
91         struct kref             kref;
92         struct list_head        node;
93 };
94
95 struct rbd_req_coll;
96
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101         struct request          *rq;            /* blk layer request */
102         struct bio              *bio;           /* cloned bio */
103         struct page             **pages;        /* list of used pages */
104         u64                     len;
105         int                     coll_index;
106         struct rbd_req_coll     *coll;
107 };
108
109 struct rbd_req_status {
110         int done;
111         int rc;
112         u64 bytes;
113 };
114
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119         int                     total;
120         int                     num_done;
121         struct kref             kref;
122         struct rbd_req_status   status[0];
123 };
124
125 struct rbd_snap {
126         struct  device          dev;
127         const char              *name;
128         size_t                  size;
129         struct list_head        node;
130         u64                     id;
131 };
132
133 /*
134  * a single device
135  */
136 struct rbd_device {
137         int                     id;             /* blkdev unique id */
138
139         int                     major;          /* blkdev assigned major */
140         struct gendisk          *disk;          /* blkdev's gendisk and rq */
141         struct request_queue    *q;
142
143         struct ceph_client      *client;
144         struct rbd_client       *rbd_client;
145
146         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147
148         spinlock_t              lock;           /* queue lock */
149
150         struct rbd_image_header header;
151         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152         int                     obj_len;
153         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
155         int                     poolid;
156
157         struct ceph_osd_event   *watch_event;
158         struct ceph_osd_request *watch_request;
159
160         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
161         u32 cur_snap;   /* index+1 of current snapshot within snap context
162                            0 - for the head */
163         int read_only;
164
165         struct list_head        node;
166
167         /* list of snapshots */
168         struct list_head        snaps;
169
170         /* sysfs related */
171         struct device           dev;
172 };
173
174 static struct bus_type rbd_bus_type = {
175         .name           = "rbd",
176 };
177
178 static spinlock_t node_lock;      /* protects client get/put */
179
180 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list);    /* devices */
182 static LIST_HEAD(rbd_client_list);      /* clients */
183
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_add(struct device *dev,
187                             struct device_attribute *attr,
188                             const char *buf,
189                             size_t count);
190 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
191                                   struct rbd_snap *snap);
192
193
194 static struct rbd_device *dev_to_rbd(struct device *dev)
195 {
196         return container_of(dev, struct rbd_device, dev);
197 }
198
199 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
200 {
201         return get_device(&rbd_dev->dev);
202 }
203
204 static void rbd_put_dev(struct rbd_device *rbd_dev)
205 {
206         put_device(&rbd_dev->dev);
207 }
208
209 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
210
211 static int rbd_open(struct block_device *bdev, fmode_t mode)
212 {
213         struct gendisk *disk = bdev->bd_disk;
214         struct rbd_device *rbd_dev = disk->private_data;
215
216         rbd_get_dev(rbd_dev);
217
218         set_device_ro(bdev, rbd_dev->read_only);
219
220         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
221                 return -EROFS;
222
223         return 0;
224 }
225
226 static int rbd_release(struct gendisk *disk, fmode_t mode)
227 {
228         struct rbd_device *rbd_dev = disk->private_data;
229
230         rbd_put_dev(rbd_dev);
231
232         return 0;
233 }
234
235 static const struct block_device_operations rbd_bd_ops = {
236         .owner                  = THIS_MODULE,
237         .open                   = rbd_open,
238         .release                = rbd_release,
239 };
240
241 /*
242  * Initialize an rbd client instance.
243  * We own *opt.
244  */
245 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
246                                             struct rbd_options *rbd_opts)
247 {
248         struct rbd_client *rbdc;
249         int ret = -ENOMEM;
250
251         dout("rbd_client_create\n");
252         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
253         if (!rbdc)
254                 goto out_opt;
255
256         kref_init(&rbdc->kref);
257         INIT_LIST_HEAD(&rbdc->node);
258
259         rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
260         if (IS_ERR(rbdc->client))
261                 goto out_rbdc;
262         opt = NULL; /* Now rbdc->client is responsible for opt */
263
264         ret = ceph_open_session(rbdc->client);
265         if (ret < 0)
266                 goto out_err;
267
268         rbdc->rbd_opts = rbd_opts;
269
270         spin_lock(&node_lock);
271         list_add_tail(&rbdc->node, &rbd_client_list);
272         spin_unlock(&node_lock);
273
274         dout("rbd_client_create created %p\n", rbdc);
275         return rbdc;
276
277 out_err:
278         ceph_destroy_client(rbdc->client);
279 out_rbdc:
280         kfree(rbdc);
281 out_opt:
282         if (opt)
283                 ceph_destroy_options(opt);
284         return ERR_PTR(ret);
285 }
286
287 /*
288  * Find a ceph client with specific addr and configuration.
289  */
290 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
291 {
292         struct rbd_client *client_node;
293
294         if (opt->flags & CEPH_OPT_NOSHARE)
295                 return NULL;
296
297         list_for_each_entry(client_node, &rbd_client_list, node)
298                 if (ceph_compare_options(opt, client_node->client) == 0)
299                         return client_node;
300         return NULL;
301 }
302
303 /*
304  * mount options
305  */
306 enum {
307         Opt_notify_timeout,
308         Opt_last_int,
309         /* int args above */
310         Opt_last_string,
311         /* string args above */
312 };
313
314 static match_table_t rbdopt_tokens = {
315         {Opt_notify_timeout, "notify_timeout=%d"},
316         /* int args above */
317         /* string args above */
318         {-1, NULL}
319 };
320
321 static int parse_rbd_opts_token(char *c, void *private)
322 {
323         struct rbd_options *rbdopt = private;
324         substring_t argstr[MAX_OPT_ARGS];
325         int token, intval, ret;
326
327         token = match_token((char *)c, rbdopt_tokens, argstr);
328         if (token < 0)
329                 return -EINVAL;
330
331         if (token < Opt_last_int) {
332                 ret = match_int(&argstr[0], &intval);
333                 if (ret < 0) {
334                         pr_err("bad mount option arg (not int) "
335                                "at '%s'\n", c);
336                         return ret;
337                 }
338                 dout("got int token %d val %d\n", token, intval);
339         } else if (token > Opt_last_int && token < Opt_last_string) {
340                 dout("got string token %d val %s\n", token,
341                      argstr[0].from);
342         } else {
343                 dout("got token %d\n", token);
344         }
345
346         switch (token) {
347         case Opt_notify_timeout:
348                 rbdopt->notify_timeout = intval;
349                 break;
350         default:
351                 BUG_ON(token);
352         }
353         return 0;
354 }
355
356 /*
357  * Get a ceph client with specific addr and configuration, if one does
358  * not exist create it.
359  */
360 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
361                           char *options)
362 {
363         struct rbd_client *rbdc;
364         struct ceph_options *opt;
365         int ret;
366         struct rbd_options *rbd_opts;
367
368         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
369         if (!rbd_opts)
370                 return -ENOMEM;
371
372         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
373
374         ret = ceph_parse_options(&opt, options, mon_addr,
375                                  mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
376         if (ret < 0)
377                 goto done_err;
378
379         spin_lock(&node_lock);
380         rbdc = __rbd_client_find(opt);
381         if (rbdc) {
382                 ceph_destroy_options(opt);
383                 kfree(rbd_opts);
384
385                 /* using an existing client */
386                 kref_get(&rbdc->kref);
387                 rbd_dev->rbd_client = rbdc;
388                 rbd_dev->client = rbdc->client;
389                 spin_unlock(&node_lock);
390                 return 0;
391         }
392         spin_unlock(&node_lock);
393
394         rbdc = rbd_client_create(opt, rbd_opts);
395         if (IS_ERR(rbdc)) {
396                 ret = PTR_ERR(rbdc);
397                 goto done_err;
398         }
399
400         rbd_dev->rbd_client = rbdc;
401         rbd_dev->client = rbdc->client;
402         return 0;
403 done_err:
404         kfree(rbd_opts);
405         return ret;
406 }
407
408 /*
409  * Destroy ceph client
410  *
411  * Caller must hold node_lock.
412  */
413 static void rbd_client_release(struct kref *kref)
414 {
415         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
416
417         dout("rbd_release_client %p\n", rbdc);
418         list_del(&rbdc->node);
419
420         ceph_destroy_client(rbdc->client);
421         kfree(rbdc->rbd_opts);
422         kfree(rbdc);
423 }
424
425 /*
426  * Drop reference to ceph client node. If it's not referenced anymore, release
427  * it.
428  */
429 static void rbd_put_client(struct rbd_device *rbd_dev)
430 {
431         spin_lock(&node_lock);
432         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
433         spin_unlock(&node_lock);
434         rbd_dev->rbd_client = NULL;
435         rbd_dev->client = NULL;
436 }
437
438 /*
439  * Destroy requests collection
440  */
441 static void rbd_coll_release(struct kref *kref)
442 {
443         struct rbd_req_coll *coll =
444                 container_of(kref, struct rbd_req_coll, kref);
445
446         dout("rbd_coll_release %p\n", coll);
447         kfree(coll);
448 }
449
450 /*
451  * Create a new header structure, translate header format from the on-disk
452  * header.
453  */
454 static int rbd_header_from_disk(struct rbd_image_header *header,
455                                  struct rbd_image_header_ondisk *ondisk,
456                                  int allocated_snaps,
457                                  gfp_t gfp_flags)
458 {
459         int i;
460         u32 snap_count = le32_to_cpu(ondisk->snap_count);
461         int ret = -ENOMEM;
462
463         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) {
464                 return -ENXIO;
465         }
466
467         init_rwsem(&header->snap_rwsem);
468         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
469         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
470                                 snap_count *
471                                  sizeof(struct rbd_image_snap_ondisk),
472                                 gfp_flags);
473         if (!header->snapc)
474                 return -ENOMEM;
475         if (snap_count) {
476                 header->snap_names = kmalloc(header->snap_names_len,
477                                              GFP_KERNEL);
478                 if (!header->snap_names)
479                         goto err_snapc;
480                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
481                                              GFP_KERNEL);
482                 if (!header->snap_sizes)
483                         goto err_names;
484         } else {
485                 header->snap_names = NULL;
486                 header->snap_sizes = NULL;
487         }
488         memcpy(header->block_name, ondisk->block_name,
489                sizeof(ondisk->block_name));
490
491         header->image_size = le64_to_cpu(ondisk->image_size);
492         header->obj_order = ondisk->options.order;
493         header->crypt_type = ondisk->options.crypt_type;
494         header->comp_type = ondisk->options.comp_type;
495
496         atomic_set(&header->snapc->nref, 1);
497         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
498         header->snapc->num_snaps = snap_count;
499         header->total_snaps = snap_count;
500
501         if (snap_count &&
502             allocated_snaps == snap_count) {
503                 for (i = 0; i < snap_count; i++) {
504                         header->snapc->snaps[i] =
505                                 le64_to_cpu(ondisk->snaps[i].id);
506                         header->snap_sizes[i] =
507                                 le64_to_cpu(ondisk->snaps[i].image_size);
508                 }
509
510                 /* copy snapshot names */
511                 memcpy(header->snap_names, &ondisk->snaps[i],
512                         header->snap_names_len);
513         }
514
515         return 0;
516
517 err_names:
518         kfree(header->snap_names);
519 err_snapc:
520         kfree(header->snapc);
521         return ret;
522 }
523
524 static int snap_index(struct rbd_image_header *header, int snap_num)
525 {
526         return header->total_snaps - snap_num;
527 }
528
529 static u64 cur_snap_id(struct rbd_device *rbd_dev)
530 {
531         struct rbd_image_header *header = &rbd_dev->header;
532
533         if (!rbd_dev->cur_snap)
534                 return 0;
535
536         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
537 }
538
539 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
540                         u64 *seq, u64 *size)
541 {
542         int i;
543         char *p = header->snap_names;
544
545         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
546                 if (strcmp(snap_name, p) == 0)
547                         break;
548         }
549         if (i == header->total_snaps)
550                 return -ENOENT;
551         if (seq)
552                 *seq = header->snapc->snaps[i];
553
554         if (size)
555                 *size = header->snap_sizes[i];
556
557         return i;
558 }
559
560 static int rbd_header_set_snap(struct rbd_device *dev,
561                                const char *snap_name,
562                                u64 *size)
563 {
564         struct rbd_image_header *header = &dev->header;
565         struct ceph_snap_context *snapc = header->snapc;
566         int ret = -ENOENT;
567
568         down_write(&header->snap_rwsem);
569
570         if (!snap_name ||
571             !*snap_name ||
572             strcmp(snap_name, "-") == 0 ||
573             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
574                 if (header->total_snaps)
575                         snapc->seq = header->snap_seq;
576                 else
577                         snapc->seq = 0;
578                 dev->cur_snap = 0;
579                 dev->read_only = 0;
580                 if (size)
581                         *size = header->image_size;
582         } else {
583                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
584                 if (ret < 0)
585                         goto done;
586
587                 dev->cur_snap = header->total_snaps - ret;
588                 dev->read_only = 1;
589         }
590
591         ret = 0;
592 done:
593         up_write(&header->snap_rwsem);
594         return ret;
595 }
596
597 static void rbd_header_free(struct rbd_image_header *header)
598 {
599         kfree(header->snapc);
600         kfree(header->snap_names);
601         kfree(header->snap_sizes);
602 }
603
604 /*
605  * get the actual striped segment name, offset and length
606  */
607 static u64 rbd_get_segment(struct rbd_image_header *header,
608                            const char *block_name,
609                            u64 ofs, u64 len,
610                            char *seg_name, u64 *segofs)
611 {
612         u64 seg = ofs >> header->obj_order;
613
614         if (seg_name)
615                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
616                          "%s.%012llx", block_name, seg);
617
618         ofs = ofs & ((1 << header->obj_order) - 1);
619         len = min_t(u64, len, (1 << header->obj_order) - ofs);
620
621         if (segofs)
622                 *segofs = ofs;
623
624         return len;
625 }
626
627 static int rbd_get_num_segments(struct rbd_image_header *header,
628                                 u64 ofs, u64 len)
629 {
630         u64 start_seg = ofs >> header->obj_order;
631         u64 end_seg = (ofs + len - 1) >> header->obj_order;
632         return end_seg - start_seg + 1;
633 }
634
635 /*
636  * returns the size of an object in the image
637  */
638 static u64 rbd_obj_bytes(struct rbd_image_header *header)
639 {
640         return 1 << header->obj_order;
641 }
642
643 /*
644  * bio helpers
645  */
646
647 static void bio_chain_put(struct bio *chain)
648 {
649         struct bio *tmp;
650
651         while (chain) {
652                 tmp = chain;
653                 chain = chain->bi_next;
654                 bio_put(tmp);
655         }
656 }
657
658 /*
659  * zeros a bio chain, starting at specific offset
660  */
661 static void zero_bio_chain(struct bio *chain, int start_ofs)
662 {
663         struct bio_vec *bv;
664         unsigned long flags;
665         void *buf;
666         int i;
667         int pos = 0;
668
669         while (chain) {
670                 bio_for_each_segment(bv, chain, i) {
671                         if (pos + bv->bv_len > start_ofs) {
672                                 int remainder = max(start_ofs - pos, 0);
673                                 buf = bvec_kmap_irq(bv, &flags);
674                                 memset(buf + remainder, 0,
675                                        bv->bv_len - remainder);
676                                 bvec_kunmap_irq(buf, &flags);
677                         }
678                         pos += bv->bv_len;
679                 }
680
681                 chain = chain->bi_next;
682         }
683 }
684
685 /*
686  * bio_chain_clone - clone a chain of bios up to a certain length.
687  * might return a bio_pair that will need to be released.
688  */
689 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
690                                    struct bio_pair **bp,
691                                    int len, gfp_t gfpmask)
692 {
693         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
694         int total = 0;
695
696         if (*bp) {
697                 bio_pair_release(*bp);
698                 *bp = NULL;
699         }
700
701         while (old_chain && (total < len)) {
702                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
703                 if (!tmp)
704                         goto err_out;
705
706                 if (total + old_chain->bi_size > len) {
707                         struct bio_pair *bp;
708
709                         /*
710                          * this split can only happen with a single paged bio,
711                          * split_bio will BUG_ON if this is not the case
712                          */
713                         dout("bio_chain_clone split! total=%d remaining=%d"
714                              "bi_size=%d\n",
715                              (int)total, (int)len-total,
716                              (int)old_chain->bi_size);
717
718                         /* split the bio. We'll release it either in the next
719                            call, or it will have to be released outside */
720                         bp = bio_split(old_chain, (len - total) / 512ULL);
721                         if (!bp)
722                                 goto err_out;
723
724                         __bio_clone(tmp, &bp->bio1);
725
726                         *next = &bp->bio2;
727                 } else {
728                         __bio_clone(tmp, old_chain);
729                         *next = old_chain->bi_next;
730                 }
731
732                 tmp->bi_bdev = NULL;
733                 gfpmask &= ~__GFP_WAIT;
734                 tmp->bi_next = NULL;
735
736                 if (!new_chain) {
737                         new_chain = tail = tmp;
738                 } else {
739                         tail->bi_next = tmp;
740                         tail = tmp;
741                 }
742                 old_chain = old_chain->bi_next;
743
744                 total += tmp->bi_size;
745         }
746
747         BUG_ON(total < len);
748
749         if (tail)
750                 tail->bi_next = NULL;
751
752         *old = old_chain;
753
754         return new_chain;
755
756 err_out:
757         dout("bio_chain_clone with err\n");
758         bio_chain_put(new_chain);
759         return NULL;
760 }
761
762 /*
763  * helpers for osd request op vectors.
764  */
765 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
766                             int num_ops,
767                             int opcode,
768                             u32 payload_len)
769 {
770         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
771                        GFP_NOIO);
772         if (!*ops)
773                 return -ENOMEM;
774         (*ops)[0].op = opcode;
775         /*
776          * op extent offset and length will be set later on
777          * in calc_raw_layout()
778          */
779         (*ops)[0].payload_len = payload_len;
780         return 0;
781 }
782
783 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
784 {
785         kfree(ops);
786 }
787
788 static void rbd_coll_end_req_index(struct request *rq,
789                                    struct rbd_req_coll *coll,
790                                    int index,
791                                    int ret, u64 len)
792 {
793         struct request_queue *q;
794         int min, max, i;
795
796         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
797              coll, index, ret, len);
798
799         if (!rq)
800                 return;
801
802         if (!coll) {
803                 blk_end_request(rq, ret, len);
804                 return;
805         }
806
807         q = rq->q;
808
809         spin_lock_irq(q->queue_lock);
810         coll->status[index].done = 1;
811         coll->status[index].rc = ret;
812         coll->status[index].bytes = len;
813         max = min = coll->num_done;
814         while (max < coll->total && coll->status[max].done)
815                 max++;
816
817         for (i = min; i<max; i++) {
818                 __blk_end_request(rq, coll->status[i].rc,
819                                   coll->status[i].bytes);
820                 coll->num_done++;
821                 kref_put(&coll->kref, rbd_coll_release);
822         }
823         spin_unlock_irq(q->queue_lock);
824 }
825
826 static void rbd_coll_end_req(struct rbd_request *req,
827                              int ret, u64 len)
828 {
829         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
830 }
831
832 /*
833  * Send ceph osd request
834  */
835 static int rbd_do_request(struct request *rq,
836                           struct rbd_device *dev,
837                           struct ceph_snap_context *snapc,
838                           u64 snapid,
839                           const char *obj, u64 ofs, u64 len,
840                           struct bio *bio,
841                           struct page **pages,
842                           int num_pages,
843                           int flags,
844                           struct ceph_osd_req_op *ops,
845                           int num_reply,
846                           struct rbd_req_coll *coll,
847                           int coll_index,
848                           void (*rbd_cb)(struct ceph_osd_request *req,
849                                          struct ceph_msg *msg),
850                           struct ceph_osd_request **linger_req,
851                           u64 *ver)
852 {
853         struct ceph_osd_request *req;
854         struct ceph_file_layout *layout;
855         int ret;
856         u64 bno;
857         struct timespec mtime = CURRENT_TIME;
858         struct rbd_request *req_data;
859         struct ceph_osd_request_head *reqhead;
860         struct rbd_image_header *header = &dev->header;
861
862         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
863         if (!req_data) {
864                 if (coll)
865                         rbd_coll_end_req_index(rq, coll, coll_index,
866                                                -ENOMEM, len);
867                 return -ENOMEM;
868         }
869
870         if (coll) {
871                 req_data->coll = coll;
872                 req_data->coll_index = coll_index;
873         }
874
875         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
876
877         down_read(&header->snap_rwsem);
878
879         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
880                                       snapc,
881                                       ops,
882                                       false,
883                                       GFP_NOIO, pages, bio);
884         if (!req) {
885                 up_read(&header->snap_rwsem);
886                 ret = -ENOMEM;
887                 goto done_pages;
888         }
889
890         req->r_callback = rbd_cb;
891
892         req_data->rq = rq;
893         req_data->bio = bio;
894         req_data->pages = pages;
895         req_data->len = len;
896
897         req->r_priv = req_data;
898
899         reqhead = req->r_request->front.iov_base;
900         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
901
902         strncpy(req->r_oid, obj, sizeof(req->r_oid));
903         req->r_oid_len = strlen(req->r_oid);
904
905         layout = &req->r_file_layout;
906         memset(layout, 0, sizeof(*layout));
907         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
908         layout->fl_stripe_count = cpu_to_le32(1);
909         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
910         layout->fl_pg_preferred = cpu_to_le32(-1);
911         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
912         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
913                              ofs, &len, &bno, req, ops);
914
915         ceph_osdc_build_request(req, ofs, &len,
916                                 ops,
917                                 snapc,
918                                 &mtime,
919                                 req->r_oid, req->r_oid_len);
920         up_read(&header->snap_rwsem);
921
922         if (linger_req) {
923                 ceph_osdc_set_request_linger(&dev->client->osdc, req);
924                 *linger_req = req;
925         }
926
927         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
928         if (ret < 0)
929                 goto done_err;
930
931         if (!rbd_cb) {
932                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
933                 if (ver)
934                         *ver = le64_to_cpu(req->r_reassert_version.version);
935                 dout("reassert_ver=%lld\n",
936                      le64_to_cpu(req->r_reassert_version.version));
937                 ceph_osdc_put_request(req);
938         }
939         return ret;
940
941 done_err:
942         bio_chain_put(req_data->bio);
943         ceph_osdc_put_request(req);
944 done_pages:
945         rbd_coll_end_req(req_data, ret, len);
946         kfree(req_data);
947         return ret;
948 }
949
950 /*
951  * Ceph osd op callback
952  */
953 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
954 {
955         struct rbd_request *req_data = req->r_priv;
956         struct ceph_osd_reply_head *replyhead;
957         struct ceph_osd_op *op;
958         __s32 rc;
959         u64 bytes;
960         int read_op;
961
962         /* parse reply */
963         replyhead = msg->front.iov_base;
964         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
965         op = (void *)(replyhead + 1);
966         rc = le32_to_cpu(replyhead->result);
967         bytes = le64_to_cpu(op->extent.length);
968         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
969
970         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
971
972         if (rc == -ENOENT && read_op) {
973                 zero_bio_chain(req_data->bio, 0);
974                 rc = 0;
975         } else if (rc == 0 && read_op && bytes < req_data->len) {
976                 zero_bio_chain(req_data->bio, bytes);
977                 bytes = req_data->len;
978         }
979
980         rbd_coll_end_req(req_data, rc, bytes);
981
982         if (req_data->bio)
983                 bio_chain_put(req_data->bio);
984
985         ceph_osdc_put_request(req);
986         kfree(req_data);
987 }
988
989 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
990 {
991         ceph_osdc_put_request(req);
992 }
993
994 /*
995  * Do a synchronous ceph osd operation
996  */
997 static int rbd_req_sync_op(struct rbd_device *dev,
998                            struct ceph_snap_context *snapc,
999                            u64 snapid,
1000                            int opcode,
1001                            int flags,
1002                            struct ceph_osd_req_op *orig_ops,
1003                            int num_reply,
1004                            const char *obj,
1005                            u64 ofs, u64 len,
1006                            char *buf,
1007                            struct ceph_osd_request **linger_req,
1008                            u64 *ver)
1009 {
1010         int ret;
1011         struct page **pages;
1012         int num_pages;
1013         struct ceph_osd_req_op *ops = orig_ops;
1014         u32 payload_len;
1015
1016         num_pages = calc_pages_for(ofs , len);
1017         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1018         if (IS_ERR(pages))
1019                 return PTR_ERR(pages);
1020
1021         if (!orig_ops) {
1022                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1023                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1024                 if (ret < 0)
1025                         goto done;
1026
1027                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1028                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1029                         if (ret < 0)
1030                                 goto done_ops;
1031                 }
1032         }
1033
1034         ret = rbd_do_request(NULL, dev, snapc, snapid,
1035                           obj, ofs, len, NULL,
1036                           pages, num_pages,
1037                           flags,
1038                           ops,
1039                           2,
1040                           NULL, 0,
1041                           NULL,
1042                           linger_req, ver);
1043         if (ret < 0)
1044                 goto done_ops;
1045
1046         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1047                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1048
1049 done_ops:
1050         if (!orig_ops)
1051                 rbd_destroy_ops(ops);
1052 done:
1053         ceph_release_page_vector(pages, num_pages);
1054         return ret;
1055 }
1056
1057 /*
1058  * Do an asynchronous ceph osd operation
1059  */
1060 static int rbd_do_op(struct request *rq,
1061                      struct rbd_device *rbd_dev ,
1062                      struct ceph_snap_context *snapc,
1063                      u64 snapid,
1064                      int opcode, int flags, int num_reply,
1065                      u64 ofs, u64 len,
1066                      struct bio *bio,
1067                      struct rbd_req_coll *coll,
1068                      int coll_index)
1069 {
1070         char *seg_name;
1071         u64 seg_ofs;
1072         u64 seg_len;
1073         int ret;
1074         struct ceph_osd_req_op *ops;
1075         u32 payload_len;
1076
1077         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1078         if (!seg_name)
1079                 return -ENOMEM;
1080
1081         seg_len = rbd_get_segment(&rbd_dev->header,
1082                                   rbd_dev->header.block_name,
1083                                   ofs, len,
1084                                   seg_name, &seg_ofs);
1085
1086         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1087
1088         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1089         if (ret < 0)
1090                 goto done;
1091
1092         /* we've taken care of segment sizes earlier when we
1093            cloned the bios. We should never have a segment
1094            truncated at this point */
1095         BUG_ON(seg_len < len);
1096
1097         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1098                              seg_name, seg_ofs, seg_len,
1099                              bio,
1100                              NULL, 0,
1101                              flags,
1102                              ops,
1103                              num_reply,
1104                              coll, coll_index,
1105                              rbd_req_cb, 0, NULL);
1106
1107         rbd_destroy_ops(ops);
1108 done:
1109         kfree(seg_name);
1110         return ret;
1111 }
1112
1113 /*
1114  * Request async osd write
1115  */
1116 static int rbd_req_write(struct request *rq,
1117                          struct rbd_device *rbd_dev,
1118                          struct ceph_snap_context *snapc,
1119                          u64 ofs, u64 len,
1120                          struct bio *bio,
1121                          struct rbd_req_coll *coll,
1122                          int coll_index)
1123 {
1124         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1125                          CEPH_OSD_OP_WRITE,
1126                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1127                          2,
1128                          ofs, len, bio, coll, coll_index);
1129 }
1130
1131 /*
1132  * Request async osd read
1133  */
1134 static int rbd_req_read(struct request *rq,
1135                          struct rbd_device *rbd_dev,
1136                          u64 snapid,
1137                          u64 ofs, u64 len,
1138                          struct bio *bio,
1139                          struct rbd_req_coll *coll,
1140                          int coll_index)
1141 {
1142         return rbd_do_op(rq, rbd_dev, NULL,
1143                          (snapid ? snapid : CEPH_NOSNAP),
1144                          CEPH_OSD_OP_READ,
1145                          CEPH_OSD_FLAG_READ,
1146                          2,
1147                          ofs, len, bio, coll, coll_index);
1148 }
1149
1150 /*
1151  * Request sync osd read
1152  */
1153 static int rbd_req_sync_read(struct rbd_device *dev,
1154                           struct ceph_snap_context *snapc,
1155                           u64 snapid,
1156                           const char *obj,
1157                           u64 ofs, u64 len,
1158                           char *buf,
1159                           u64 *ver)
1160 {
1161         return rbd_req_sync_op(dev, NULL,
1162                                (snapid ? snapid : CEPH_NOSNAP),
1163                                CEPH_OSD_OP_READ,
1164                                CEPH_OSD_FLAG_READ,
1165                                NULL,
1166                                1, obj, ofs, len, buf, NULL, ver);
1167 }
1168
1169 /*
1170  * Request sync osd watch
1171  */
1172 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1173                                    u64 ver,
1174                                    u64 notify_id,
1175                                    const char *obj)
1176 {
1177         struct ceph_osd_req_op *ops;
1178         struct page **pages = NULL;
1179         int ret;
1180
1181         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1182         if (ret < 0)
1183                 return ret;
1184
1185         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1186         ops[0].watch.cookie = notify_id;
1187         ops[0].watch.flag = 0;
1188
1189         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1190                           obj, 0, 0, NULL,
1191                           pages, 0,
1192                           CEPH_OSD_FLAG_READ,
1193                           ops,
1194                           1,
1195                           NULL, 0,
1196                           rbd_simple_req_cb, 0, NULL);
1197
1198         rbd_destroy_ops(ops);
1199         return ret;
1200 }
1201
1202 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1203 {
1204         struct rbd_device *dev = (struct rbd_device *)data;
1205         int rc;
1206
1207         if (!dev)
1208                 return;
1209
1210         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1211                 notify_id, (int)opcode);
1212         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1213         rc = __rbd_update_snaps(dev);
1214         mutex_unlock(&ctl_mutex);
1215         if (rc)
1216                 pr_warning(DRV_NAME "%d got notification but failed to update"
1217                            " snaps: %d\n", dev->major, rc);
1218
1219         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1220 }
1221
1222 /*
1223  * Request sync osd watch
1224  */
1225 static int rbd_req_sync_watch(struct rbd_device *dev,
1226                               const char *obj,
1227                               u64 ver)
1228 {
1229         struct ceph_osd_req_op *ops;
1230         struct ceph_osd_client *osdc = &dev->client->osdc;
1231
1232         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1233         if (ret < 0)
1234                 return ret;
1235
1236         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1237                                      (void *)dev, &dev->watch_event);
1238         if (ret < 0)
1239                 goto fail;
1240
1241         ops[0].watch.ver = cpu_to_le64(ver);
1242         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1243         ops[0].watch.flag = 1;
1244
1245         ret = rbd_req_sync_op(dev, NULL,
1246                               CEPH_NOSNAP,
1247                               0,
1248                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1249                               ops,
1250                               1, obj, 0, 0, NULL,
1251                               &dev->watch_request, NULL);
1252
1253         if (ret < 0)
1254                 goto fail_event;
1255
1256         rbd_destroy_ops(ops);
1257         return 0;
1258
1259 fail_event:
1260         ceph_osdc_cancel_event(dev->watch_event);
1261         dev->watch_event = NULL;
1262 fail:
1263         rbd_destroy_ops(ops);
1264         return ret;
1265 }
1266
1267 /*
1268  * Request sync osd unwatch
1269  */
1270 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1271                                 const char *obj)
1272 {
1273         struct ceph_osd_req_op *ops;
1274
1275         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1276         if (ret < 0)
1277                 return ret;
1278
1279         ops[0].watch.ver = 0;
1280         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1281         ops[0].watch.flag = 0;
1282
1283         ret = rbd_req_sync_op(dev, NULL,
1284                               CEPH_NOSNAP,
1285                               0,
1286                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1287                               ops,
1288                               1, obj, 0, 0, NULL, NULL, NULL);
1289
1290         rbd_destroy_ops(ops);
1291         ceph_osdc_cancel_event(dev->watch_event);
1292         dev->watch_event = NULL;
1293         return ret;
1294 }
1295
1296 struct rbd_notify_info {
1297         struct rbd_device *dev;
1298 };
1299
1300 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1301 {
1302         struct rbd_device *dev = (struct rbd_device *)data;
1303         if (!dev)
1304                 return;
1305
1306         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1307                 notify_id, (int)opcode);
1308 }
1309
1310 /*
1311  * Request sync osd notify
1312  */
1313 static int rbd_req_sync_notify(struct rbd_device *dev,
1314                           const char *obj)
1315 {
1316         struct ceph_osd_req_op *ops;
1317         struct ceph_osd_client *osdc = &dev->client->osdc;
1318         struct ceph_osd_event *event;
1319         struct rbd_notify_info info;
1320         int payload_len = sizeof(u32) + sizeof(u32);
1321         int ret;
1322
1323         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1324         if (ret < 0)
1325                 return ret;
1326
1327         info.dev = dev;
1328
1329         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1330                                      (void *)&info, &event);
1331         if (ret < 0)
1332                 goto fail;
1333
1334         ops[0].watch.ver = 1;
1335         ops[0].watch.flag = 1;
1336         ops[0].watch.cookie = event->cookie;
1337         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1338         ops[0].watch.timeout = 12;
1339
1340         ret = rbd_req_sync_op(dev, NULL,
1341                                CEPH_NOSNAP,
1342                                0,
1343                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1344                                ops,
1345                                1, obj, 0, 0, NULL, NULL, NULL);
1346         if (ret < 0)
1347                 goto fail_event;
1348
1349         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1350         dout("ceph_osdc_wait_event returned %d\n", ret);
1351         rbd_destroy_ops(ops);
1352         return 0;
1353
1354 fail_event:
1355         ceph_osdc_cancel_event(event);
1356 fail:
1357         rbd_destroy_ops(ops);
1358         return ret;
1359 }
1360
1361 /*
1362  * Request sync osd read
1363  */
1364 static int rbd_req_sync_exec(struct rbd_device *dev,
1365                              const char *obj,
1366                              const char *cls,
1367                              const char *method,
1368                              const char *data,
1369                              int len,
1370                              u64 *ver)
1371 {
1372         struct ceph_osd_req_op *ops;
1373         int cls_len = strlen(cls);
1374         int method_len = strlen(method);
1375         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1376                                     cls_len + method_len + len);
1377         if (ret < 0)
1378                 return ret;
1379
1380         ops[0].cls.class_name = cls;
1381         ops[0].cls.class_len = (__u8)cls_len;
1382         ops[0].cls.method_name = method;
1383         ops[0].cls.method_len = (__u8)method_len;
1384         ops[0].cls.argc = 0;
1385         ops[0].cls.indata = data;
1386         ops[0].cls.indata_len = len;
1387
1388         ret = rbd_req_sync_op(dev, NULL,
1389                                CEPH_NOSNAP,
1390                                0,
1391                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1392                                ops,
1393                                1, obj, 0, 0, NULL, NULL, ver);
1394
1395         rbd_destroy_ops(ops);
1396
1397         dout("cls_exec returned %d\n", ret);
1398         return ret;
1399 }
1400
1401 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1402 {
1403         struct rbd_req_coll *coll =
1404                         kzalloc(sizeof(struct rbd_req_coll) +
1405                                 sizeof(struct rbd_req_status) * num_reqs,
1406                                 GFP_ATOMIC);
1407
1408         if (!coll)
1409                 return NULL;
1410         coll->total = num_reqs;
1411         kref_init(&coll->kref);
1412         return coll;
1413 }
1414
1415 /*
1416  * block device queue callback
1417  */
1418 static void rbd_rq_fn(struct request_queue *q)
1419 {
1420         struct rbd_device *rbd_dev = q->queuedata;
1421         struct request *rq;
1422         struct bio_pair *bp = NULL;
1423
1424         rq = blk_fetch_request(q);
1425
1426         while (1) {
1427                 struct bio *bio;
1428                 struct bio *rq_bio, *next_bio = NULL;
1429                 bool do_write;
1430                 int size, op_size = 0;
1431                 u64 ofs;
1432                 int num_segs, cur_seg = 0;
1433                 struct rbd_req_coll *coll;
1434
1435                 /* peek at request from block layer */
1436                 if (!rq)
1437                         break;
1438
1439                 dout("fetched request\n");
1440
1441                 /* filter out block requests we don't understand */
1442                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1443                         __blk_end_request_all(rq, 0);
1444                         goto next;
1445                 }
1446
1447                 /* deduce our operation (read, write) */
1448                 do_write = (rq_data_dir(rq) == WRITE);
1449
1450                 size = blk_rq_bytes(rq);
1451                 ofs = blk_rq_pos(rq) * 512ULL;
1452                 rq_bio = rq->bio;
1453                 if (do_write && rbd_dev->read_only) {
1454                         __blk_end_request_all(rq, -EROFS);
1455                         goto next;
1456                 }
1457
1458                 spin_unlock_irq(q->queue_lock);
1459
1460                 dout("%s 0x%x bytes at 0x%llx\n",
1461                      do_write ? "write" : "read",
1462                      size, blk_rq_pos(rq) * 512ULL);
1463
1464                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1465                 coll = rbd_alloc_coll(num_segs);
1466                 if (!coll) {
1467                         spin_lock_irq(q->queue_lock);
1468                         __blk_end_request_all(rq, -ENOMEM);
1469                         goto next;
1470                 }
1471
1472                 do {
1473                         /* a bio clone to be passed down to OSD req */
1474                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1475                         op_size = rbd_get_segment(&rbd_dev->header,
1476                                                   rbd_dev->header.block_name,
1477                                                   ofs, size,
1478                                                   NULL, NULL);
1479                         kref_get(&coll->kref);
1480                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1481                                               op_size, GFP_ATOMIC);
1482                         if (!bio) {
1483                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1484                                                        -ENOMEM, op_size);
1485                                 goto next_seg;
1486                         }
1487
1488
1489                         /* init OSD command: write or read */
1490                         if (do_write)
1491                                 rbd_req_write(rq, rbd_dev,
1492                                               rbd_dev->header.snapc,
1493                                               ofs,
1494                                               op_size, bio,
1495                                               coll, cur_seg);
1496                         else
1497                                 rbd_req_read(rq, rbd_dev,
1498                                              cur_snap_id(rbd_dev),
1499                                              ofs,
1500                                              op_size, bio,
1501                                              coll, cur_seg);
1502
1503 next_seg:
1504                         size -= op_size;
1505                         ofs += op_size;
1506
1507                         cur_seg++;
1508                         rq_bio = next_bio;
1509                 } while (size > 0);
1510                 kref_put(&coll->kref, rbd_coll_release);
1511
1512                 if (bp)
1513                         bio_pair_release(bp);
1514                 spin_lock_irq(q->queue_lock);
1515 next:
1516                 rq = blk_fetch_request(q);
1517         }
1518 }
1519
1520 /*
1521  * a queue callback. Makes sure that we don't create a bio that spans across
1522  * multiple osd objects. One exception would be with a single page bios,
1523  * which we handle later at bio_chain_clone
1524  */
1525 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1526                           struct bio_vec *bvec)
1527 {
1528         struct rbd_device *rbd_dev = q->queuedata;
1529         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1530         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1531         unsigned int bio_sectors = bmd->bi_size >> 9;
1532         int max;
1533
1534         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1535                                  + bio_sectors)) << 9;
1536         if (max < 0)
1537                 max = 0; /* bio_add cannot handle a negative return */
1538         if (max <= bvec->bv_len && bio_sectors == 0)
1539                 return bvec->bv_len;
1540         return max;
1541 }
1542
1543 static void rbd_free_disk(struct rbd_device *rbd_dev)
1544 {
1545         struct gendisk *disk = rbd_dev->disk;
1546
1547         if (!disk)
1548                 return;
1549
1550         rbd_header_free(&rbd_dev->header);
1551
1552         if (disk->flags & GENHD_FL_UP)
1553                 del_gendisk(disk);
1554         if (disk->queue)
1555                 blk_cleanup_queue(disk->queue);
1556         put_disk(disk);
1557 }
1558
1559 /*
1560  * reload the ondisk the header 
1561  */
1562 static int rbd_read_header(struct rbd_device *rbd_dev,
1563                            struct rbd_image_header *header)
1564 {
1565         ssize_t rc;
1566         struct rbd_image_header_ondisk *dh;
1567         int snap_count = 0;
1568         u64 snap_names_len = 0;
1569         u64 ver;
1570
1571         while (1) {
1572                 int len = sizeof(*dh) +
1573                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1574                           snap_names_len;
1575
1576                 rc = -ENOMEM;
1577                 dh = kmalloc(len, GFP_KERNEL);
1578                 if (!dh)
1579                         return -ENOMEM;
1580
1581                 rc = rbd_req_sync_read(rbd_dev,
1582                                        NULL, CEPH_NOSNAP,
1583                                        rbd_dev->obj_md_name,
1584                                        0, len,
1585                                        (char *)dh, &ver);
1586                 if (rc < 0)
1587                         goto out_dh;
1588
1589                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1590                 if (rc < 0) {
1591                         if (rc == -ENXIO) {
1592                                 pr_warning("unrecognized header format"
1593                                            " for image %s", rbd_dev->obj);
1594                         }
1595                         goto out_dh;
1596                 }
1597
1598                 if (snap_count != header->total_snaps) {
1599                         snap_count = header->total_snaps;
1600                         snap_names_len = header->snap_names_len;
1601                         rbd_header_free(header);
1602                         kfree(dh);
1603                         continue;
1604                 }
1605                 break;
1606         }
1607         header->obj_version = ver;
1608
1609 out_dh:
1610         kfree(dh);
1611         return rc;
1612 }
1613
1614 /*
1615  * create a snapshot
1616  */
1617 static int rbd_header_add_snap(struct rbd_device *dev,
1618                                const char *snap_name,
1619                                gfp_t gfp_flags)
1620 {
1621         int name_len = strlen(snap_name);
1622         u64 new_snapid;
1623         int ret;
1624         void *data, *p, *e;
1625         u64 ver;
1626
1627         /* we should create a snapshot only if we're pointing at the head */
1628         if (dev->cur_snap)
1629                 return -EINVAL;
1630
1631         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1632                                       &new_snapid);
1633         dout("created snapid=%lld\n", new_snapid);
1634         if (ret < 0)
1635                 return ret;
1636
1637         data = kmalloc(name_len + 16, gfp_flags);
1638         if (!data)
1639                 return -ENOMEM;
1640
1641         p = data;
1642         e = data + name_len + 16;
1643
1644         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1645         ceph_encode_64_safe(&p, e, new_snapid, bad);
1646
1647         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1648                                 data, p - data, &ver);
1649
1650         kfree(data);
1651
1652         if (ret < 0)
1653                 return ret;
1654
1655         dev->header.snapc->seq =  new_snapid;
1656
1657         return 0;
1658 bad:
1659         return -ERANGE;
1660 }
1661
1662 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1663 {
1664         struct rbd_snap *snap;
1665
1666         while (!list_empty(&rbd_dev->snaps)) {
1667                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1668                 __rbd_remove_snap_dev(rbd_dev, snap);
1669         }
1670 }
1671
1672 /*
1673  * only read the first part of the ondisk header, without the snaps info
1674  */
1675 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1676 {
1677         int ret;
1678         struct rbd_image_header h;
1679         u64 snap_seq;
1680         int follow_seq = 0;
1681
1682         ret = rbd_read_header(rbd_dev, &h);
1683         if (ret < 0)
1684                 return ret;
1685
1686         /* resized? */
1687         set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1688
1689         down_write(&rbd_dev->header.snap_rwsem);
1690
1691         snap_seq = rbd_dev->header.snapc->seq;
1692         if (rbd_dev->header.total_snaps &&
1693             rbd_dev->header.snapc->snaps[0] == snap_seq)
1694                 /* pointing at the head, will need to follow that
1695                    if head moves */
1696                 follow_seq = 1;
1697
1698         kfree(rbd_dev->header.snapc);
1699         kfree(rbd_dev->header.snap_names);
1700         kfree(rbd_dev->header.snap_sizes);
1701
1702         rbd_dev->header.total_snaps = h.total_snaps;
1703         rbd_dev->header.snapc = h.snapc;
1704         rbd_dev->header.snap_names = h.snap_names;
1705         rbd_dev->header.snap_names_len = h.snap_names_len;
1706         rbd_dev->header.snap_sizes = h.snap_sizes;
1707         if (follow_seq)
1708                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1709         else
1710                 rbd_dev->header.snapc->seq = snap_seq;
1711
1712         ret = __rbd_init_snaps_header(rbd_dev);
1713
1714         up_write(&rbd_dev->header.snap_rwsem);
1715
1716         return ret;
1717 }
1718
1719 static int rbd_init_disk(struct rbd_device *rbd_dev)
1720 {
1721         struct gendisk *disk;
1722         struct request_queue *q;
1723         int rc;
1724         u64 total_size = 0;
1725
1726         /* contact OSD, request size info about the object being mapped */
1727         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1728         if (rc)
1729                 return rc;
1730
1731         /* no need to lock here, as rbd_dev is not registered yet */
1732         rc = __rbd_init_snaps_header(rbd_dev);
1733         if (rc)
1734                 return rc;
1735
1736         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1737         if (rc)
1738                 return rc;
1739
1740         /* create gendisk info */
1741         rc = -ENOMEM;
1742         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1743         if (!disk)
1744                 goto out;
1745
1746         snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1747                  rbd_dev->id);
1748         disk->major = rbd_dev->major;
1749         disk->first_minor = 0;
1750         disk->fops = &rbd_bd_ops;
1751         disk->private_data = rbd_dev;
1752
1753         /* init rq */
1754         rc = -ENOMEM;
1755         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1756         if (!q)
1757                 goto out_disk;
1758
1759         /* set io sizes to object size */
1760         blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1761         blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1762         blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1763         blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1764
1765         blk_queue_merge_bvec(q, rbd_merge_bvec);
1766         disk->queue = q;
1767
1768         q->queuedata = rbd_dev;
1769
1770         rbd_dev->disk = disk;
1771         rbd_dev->q = q;
1772
1773         /* finally, announce the disk to the world */
1774         set_capacity(disk, total_size / 512ULL);
1775         add_disk(disk);
1776
1777         pr_info("%s: added with size 0x%llx\n",
1778                 disk->disk_name, (unsigned long long)total_size);
1779         return 0;
1780
1781 out_disk:
1782         put_disk(disk);
1783 out:
1784         return rc;
1785 }
1786
1787 /*
1788   sysfs
1789 */
1790
1791 static ssize_t rbd_size_show(struct device *dev,
1792                              struct device_attribute *attr, char *buf)
1793 {
1794         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1795
1796         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1797 }
1798
1799 static ssize_t rbd_major_show(struct device *dev,
1800                               struct device_attribute *attr, char *buf)
1801 {
1802         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1803
1804         return sprintf(buf, "%d\n", rbd_dev->major);
1805 }
1806
1807 static ssize_t rbd_client_id_show(struct device *dev,
1808                                   struct device_attribute *attr, char *buf)
1809 {
1810         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1811
1812         return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1813 }
1814
1815 static ssize_t rbd_pool_show(struct device *dev,
1816                              struct device_attribute *attr, char *buf)
1817 {
1818         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1819
1820         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1821 }
1822
1823 static ssize_t rbd_name_show(struct device *dev,
1824                              struct device_attribute *attr, char *buf)
1825 {
1826         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1827
1828         return sprintf(buf, "%s\n", rbd_dev->obj);
1829 }
1830
1831 static ssize_t rbd_snap_show(struct device *dev,
1832                              struct device_attribute *attr,
1833                              char *buf)
1834 {
1835         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1836
1837         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1838 }
1839
1840 static ssize_t rbd_image_refresh(struct device *dev,
1841                                  struct device_attribute *attr,
1842                                  const char *buf,
1843                                  size_t size)
1844 {
1845         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1846         int rc;
1847         int ret = size;
1848
1849         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1850
1851         rc = __rbd_update_snaps(rbd_dev);
1852         if (rc < 0)
1853                 ret = rc;
1854
1855         mutex_unlock(&ctl_mutex);
1856         return ret;
1857 }
1858
1859 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1860 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1861 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1862 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1863 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1864 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1865 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1866 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1867
1868 static struct attribute *rbd_attrs[] = {
1869         &dev_attr_size.attr,
1870         &dev_attr_major.attr,
1871         &dev_attr_client_id.attr,
1872         &dev_attr_pool.attr,
1873         &dev_attr_name.attr,
1874         &dev_attr_current_snap.attr,
1875         &dev_attr_refresh.attr,
1876         &dev_attr_create_snap.attr,
1877         NULL
1878 };
1879
1880 static struct attribute_group rbd_attr_group = {
1881         .attrs = rbd_attrs,
1882 };
1883
1884 static const struct attribute_group *rbd_attr_groups[] = {
1885         &rbd_attr_group,
1886         NULL
1887 };
1888
1889 static void rbd_sysfs_dev_release(struct device *dev)
1890 {
1891 }
1892
1893 static struct device_type rbd_device_type = {
1894         .name           = "rbd",
1895         .groups         = rbd_attr_groups,
1896         .release        = rbd_sysfs_dev_release,
1897 };
1898
1899
1900 /*
1901   sysfs - snapshots
1902 */
1903
1904 static ssize_t rbd_snap_size_show(struct device *dev,
1905                                   struct device_attribute *attr,
1906                                   char *buf)
1907 {
1908         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1909
1910         return sprintf(buf, "%lld\n", (long long)snap->size);
1911 }
1912
1913 static ssize_t rbd_snap_id_show(struct device *dev,
1914                                 struct device_attribute *attr,
1915                                 char *buf)
1916 {
1917         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1918
1919         return sprintf(buf, "%lld\n", (long long)snap->id);
1920 }
1921
1922 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1923 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1924
1925 static struct attribute *rbd_snap_attrs[] = {
1926         &dev_attr_snap_size.attr,
1927         &dev_attr_snap_id.attr,
1928         NULL,
1929 };
1930
1931 static struct attribute_group rbd_snap_attr_group = {
1932         .attrs = rbd_snap_attrs,
1933 };
1934
1935 static void rbd_snap_dev_release(struct device *dev)
1936 {
1937         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1938         kfree(snap->name);
1939         kfree(snap);
1940 }
1941
1942 static const struct attribute_group *rbd_snap_attr_groups[] = {
1943         &rbd_snap_attr_group,
1944         NULL
1945 };
1946
1947 static struct device_type rbd_snap_device_type = {
1948         .groups         = rbd_snap_attr_groups,
1949         .release        = rbd_snap_dev_release,
1950 };
1951
1952 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1953                                   struct rbd_snap *snap)
1954 {
1955         list_del(&snap->node);
1956         device_unregister(&snap->dev);
1957 }
1958
1959 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1960                                   struct rbd_snap *snap,
1961                                   struct device *parent)
1962 {
1963         struct device *dev = &snap->dev;
1964         int ret;
1965
1966         dev->type = &rbd_snap_device_type;
1967         dev->parent = parent;
1968         dev->release = rbd_snap_dev_release;
1969         dev_set_name(dev, "snap_%s", snap->name);
1970         ret = device_register(dev);
1971
1972         return ret;
1973 }
1974
1975 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1976                               int i, const char *name,
1977                               struct rbd_snap **snapp)
1978 {
1979         int ret;
1980         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1981         if (!snap)
1982                 return -ENOMEM;
1983         snap->name = kstrdup(name, GFP_KERNEL);
1984         snap->size = rbd_dev->header.snap_sizes[i];
1985         snap->id = rbd_dev->header.snapc->snaps[i];
1986         if (device_is_registered(&rbd_dev->dev)) {
1987                 ret = rbd_register_snap_dev(rbd_dev, snap,
1988                                              &rbd_dev->dev);
1989                 if (ret < 0)
1990                         goto err;
1991         }
1992         *snapp = snap;
1993         return 0;
1994 err:
1995         kfree(snap->name);
1996         kfree(snap);
1997         return ret;
1998 }
1999
2000 /*
2001  * search for the previous snap in a null delimited string list
2002  */
2003 const char *rbd_prev_snap_name(const char *name, const char *start)
2004 {
2005         if (name < start + 2)
2006                 return NULL;
2007
2008         name -= 2;
2009         while (*name) {
2010                 if (name == start)
2011                         return start;
2012                 name--;
2013         }
2014         return name + 1;
2015 }
2016
2017 /*
2018  * compare the old list of snapshots that we have to what's in the header
2019  * and update it accordingly. Note that the header holds the snapshots
2020  * in a reverse order (from newest to oldest) and we need to go from
2021  * older to new so that we don't get a duplicate snap name when
2022  * doing the process (e.g., removed snapshot and recreated a new
2023  * one with the same name.
2024  */
2025 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2026 {
2027         const char *name, *first_name;
2028         int i = rbd_dev->header.total_snaps;
2029         struct rbd_snap *snap, *old_snap = NULL;
2030         int ret;
2031         struct list_head *p, *n;
2032
2033         first_name = rbd_dev->header.snap_names;
2034         name = first_name + rbd_dev->header.snap_names_len;
2035
2036         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2037                 u64 cur_id;
2038
2039                 old_snap = list_entry(p, struct rbd_snap, node);
2040
2041                 if (i)
2042                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2043
2044                 if (!i || old_snap->id < cur_id) {
2045                         /* old_snap->id was skipped, thus was removed */
2046                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2047                         continue;
2048                 }
2049                 if (old_snap->id == cur_id) {
2050                         /* we have this snapshot already */
2051                         i--;
2052                         name = rbd_prev_snap_name(name, first_name);
2053                         continue;
2054                 }
2055                 for (; i > 0;
2056                      i--, name = rbd_prev_snap_name(name, first_name)) {
2057                         if (!name) {
2058                                 WARN_ON(1);
2059                                 return -EINVAL;
2060                         }
2061                         cur_id = rbd_dev->header.snapc->snaps[i];
2062                         /* snapshot removal? handle it above */
2063                         if (cur_id >= old_snap->id)
2064                                 break;
2065                         /* a new snapshot */
2066                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2067                         if (ret < 0)
2068                                 return ret;
2069
2070                         /* note that we add it backward so using n and not p */
2071                         list_add(&snap->node, n);
2072                         p = &snap->node;
2073                 }
2074         }
2075         /* we're done going over the old snap list, just add what's left */
2076         for (; i > 0; i--) {
2077                 name = rbd_prev_snap_name(name, first_name);
2078                 if (!name) {
2079                         WARN_ON(1);
2080                         return -EINVAL;
2081                 }
2082                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2083                 if (ret < 0)
2084                         return ret;
2085                 list_add(&snap->node, &rbd_dev->snaps);
2086         }
2087
2088         return 0;
2089 }
2090
2091
2092 static void rbd_root_dev_release(struct device *dev)
2093 {
2094 }
2095
2096 static struct device rbd_root_dev = {
2097         .init_name =    "rbd",
2098         .release =      rbd_root_dev_release,
2099 };
2100
2101 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2102 {
2103         int ret = -ENOMEM;
2104         struct device *dev;
2105         struct rbd_snap *snap;
2106
2107         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2108         dev = &rbd_dev->dev;
2109
2110         dev->bus = &rbd_bus_type;
2111         dev->type = &rbd_device_type;
2112         dev->parent = &rbd_root_dev;
2113         dev->release = rbd_dev_release;
2114         dev_set_name(dev, "%d", rbd_dev->id);
2115         ret = device_register(dev);
2116         if (ret < 0)
2117                 goto done_free;
2118
2119         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2120                 ret = rbd_register_snap_dev(rbd_dev, snap,
2121                                              &rbd_dev->dev);
2122                 if (ret < 0)
2123                         break;
2124         }
2125
2126         mutex_unlock(&ctl_mutex);
2127         return 0;
2128 done_free:
2129         mutex_unlock(&ctl_mutex);
2130         return ret;
2131 }
2132
2133 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2134 {
2135         device_unregister(&rbd_dev->dev);
2136 }
2137
2138 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2139 {
2140         int ret, rc;
2141
2142         do {
2143                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2144                                          rbd_dev->header.obj_version);
2145                 if (ret == -ERANGE) {
2146                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2147                         rc = __rbd_update_snaps(rbd_dev);
2148                         mutex_unlock(&ctl_mutex);
2149                         if (rc < 0)
2150                                 return rc;
2151                 }
2152         } while (ret == -ERANGE);
2153
2154         return ret;
2155 }
2156
2157 static ssize_t rbd_add(struct bus_type *bus,
2158                        const char *buf,
2159                        size_t count)
2160 {
2161         struct ceph_osd_client *osdc;
2162         struct rbd_device *rbd_dev;
2163         ssize_t rc = -ENOMEM;
2164         int irc, new_id = 0;
2165         struct list_head *tmp;
2166         char *mon_dev_name;
2167         char *options;
2168
2169         if (!try_module_get(THIS_MODULE))
2170                 return -ENODEV;
2171
2172         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2173         if (!mon_dev_name)
2174                 goto err_out_mod;
2175
2176         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2177         if (!options)
2178                 goto err_mon_dev;
2179
2180         /* new rbd_device object */
2181         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2182         if (!rbd_dev)
2183                 goto err_out_opt;
2184
2185         /* static rbd_device initialization */
2186         spin_lock_init(&rbd_dev->lock);
2187         INIT_LIST_HEAD(&rbd_dev->node);
2188         INIT_LIST_HEAD(&rbd_dev->snaps);
2189
2190         init_rwsem(&rbd_dev->header.snap_rwsem);
2191
2192         /* generate unique id: find highest unique id, add one */
2193         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2194
2195         list_for_each(tmp, &rbd_dev_list) {
2196                 struct rbd_device *rbd_dev;
2197
2198                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2199                 if (rbd_dev->id >= new_id)
2200                         new_id = rbd_dev->id + 1;
2201         }
2202
2203         rbd_dev->id = new_id;
2204
2205         /* add to global list */
2206         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2207
2208         /* parse add command */
2209         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2210                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
2211                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2212                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2213                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2214                    mon_dev_name, options, rbd_dev->pool_name,
2215                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
2216                 rc = -EINVAL;
2217                 goto err_out_slot;
2218         }
2219
2220         if (rbd_dev->snap_name[0] == 0)
2221                 rbd_dev->snap_name[0] = '-';
2222
2223         rbd_dev->obj_len = strlen(rbd_dev->obj);
2224         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2225                  rbd_dev->obj, RBD_SUFFIX);
2226
2227         /* initialize rest of new object */
2228         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2229         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2230         if (rc < 0)
2231                 goto err_out_slot;
2232
2233         mutex_unlock(&ctl_mutex);
2234
2235         /* pick the pool */
2236         osdc = &rbd_dev->client->osdc;
2237         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2238         if (rc < 0)
2239                 goto err_out_client;
2240         rbd_dev->poolid = rc;
2241
2242         /* register our block device */
2243         irc = register_blkdev(0, rbd_dev->name);
2244         if (irc < 0) {
2245                 rc = irc;
2246                 goto err_out_client;
2247         }
2248         rbd_dev->major = irc;
2249
2250         rc = rbd_bus_add_dev(rbd_dev);
2251         if (rc)
2252                 goto err_out_blkdev;
2253
2254         /* set up and announce blkdev mapping */
2255         rc = rbd_init_disk(rbd_dev);
2256         if (rc)
2257                 goto err_out_bus;
2258
2259         rc = rbd_init_watch_dev(rbd_dev);
2260         if (rc)
2261                 goto err_out_bus;
2262
2263         return count;
2264
2265 err_out_bus:
2266         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2267         list_del_init(&rbd_dev->node);
2268         mutex_unlock(&ctl_mutex);
2269
2270         /* this will also clean up rest of rbd_dev stuff */
2271
2272         rbd_bus_del_dev(rbd_dev);
2273         kfree(options);
2274         kfree(mon_dev_name);
2275         return rc;
2276
2277 err_out_blkdev:
2278         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2279 err_out_client:
2280         rbd_put_client(rbd_dev);
2281         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2282 err_out_slot:
2283         list_del_init(&rbd_dev->node);
2284         mutex_unlock(&ctl_mutex);
2285
2286         kfree(rbd_dev);
2287 err_out_opt:
2288         kfree(options);
2289 err_mon_dev:
2290         kfree(mon_dev_name);
2291 err_out_mod:
2292         dout("Error adding device %s\n", buf);
2293         module_put(THIS_MODULE);
2294         return rc;
2295 }
2296
2297 static struct rbd_device *__rbd_get_dev(unsigned long id)
2298 {
2299         struct list_head *tmp;
2300         struct rbd_device *rbd_dev;
2301
2302         list_for_each(tmp, &rbd_dev_list) {
2303                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2304                 if (rbd_dev->id == id)
2305                         return rbd_dev;
2306         }
2307         return NULL;
2308 }
2309
2310 static void rbd_dev_release(struct device *dev)
2311 {
2312         struct rbd_device *rbd_dev =
2313                         container_of(dev, struct rbd_device, dev);
2314
2315         if (rbd_dev->watch_request)
2316                 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2317                                                     rbd_dev->watch_request);
2318         if (rbd_dev->watch_event)
2319                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2320
2321         rbd_put_client(rbd_dev);
2322
2323         /* clean up and free blkdev */
2324         rbd_free_disk(rbd_dev);
2325         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2326         kfree(rbd_dev);
2327
2328         /* release module ref */
2329         module_put(THIS_MODULE);
2330 }
2331
2332 static ssize_t rbd_remove(struct bus_type *bus,
2333                           const char *buf,
2334                           size_t count)
2335 {
2336         struct rbd_device *rbd_dev = NULL;
2337         int target_id, rc;
2338         unsigned long ul;
2339         int ret = count;
2340
2341         rc = strict_strtoul(buf, 10, &ul);
2342         if (rc)
2343                 return rc;
2344
2345         /* convert to int; abort if we lost anything in the conversion */
2346         target_id = (int) ul;
2347         if (target_id != ul)
2348                 return -EINVAL;
2349
2350         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2351
2352         rbd_dev = __rbd_get_dev(target_id);
2353         if (!rbd_dev) {
2354                 ret = -ENOENT;
2355                 goto done;
2356         }
2357
2358         list_del_init(&rbd_dev->node);
2359
2360         __rbd_remove_all_snaps(rbd_dev);
2361         rbd_bus_del_dev(rbd_dev);
2362
2363 done:
2364         mutex_unlock(&ctl_mutex);
2365         return ret;
2366 }
2367
2368 static ssize_t rbd_snap_add(struct device *dev,
2369                             struct device_attribute *attr,
2370                             const char *buf,
2371                             size_t count)
2372 {
2373         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2374         int ret;
2375         char *name = kmalloc(count + 1, GFP_KERNEL);
2376         if (!name)
2377                 return -ENOMEM;
2378
2379         snprintf(name, count, "%s", buf);
2380
2381         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2382
2383         ret = rbd_header_add_snap(rbd_dev,
2384                                   name, GFP_KERNEL);
2385         if (ret < 0)
2386                 goto err_unlock;
2387
2388         ret = __rbd_update_snaps(rbd_dev);
2389         if (ret < 0)
2390                 goto err_unlock;
2391
2392         /* shouldn't hold ctl_mutex when notifying.. notify might
2393            trigger a watch callback that would need to get that mutex */
2394         mutex_unlock(&ctl_mutex);
2395
2396         /* make a best effort, don't error if failed */
2397         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2398
2399         ret = count;
2400         kfree(name);
2401         return ret;
2402
2403 err_unlock:
2404         mutex_unlock(&ctl_mutex);
2405         kfree(name);
2406         return ret;
2407 }
2408
2409 static struct bus_attribute rbd_bus_attrs[] = {
2410         __ATTR(add, S_IWUSR, NULL, rbd_add),
2411         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2412         __ATTR_NULL
2413 };
2414
2415 /*
2416  * create control files in sysfs
2417  * /sys/bus/rbd/...
2418  */
2419 static int rbd_sysfs_init(void)
2420 {
2421         int ret;
2422
2423         rbd_bus_type.bus_attrs = rbd_bus_attrs;
2424
2425         ret = bus_register(&rbd_bus_type);
2426          if (ret < 0)
2427                 return ret;
2428
2429         ret = device_register(&rbd_root_dev);
2430
2431         return ret;
2432 }
2433
2434 static void rbd_sysfs_cleanup(void)
2435 {
2436         device_unregister(&rbd_root_dev);
2437         bus_unregister(&rbd_bus_type);
2438 }
2439
2440 int __init rbd_init(void)
2441 {
2442         int rc;
2443
2444         rc = rbd_sysfs_init();
2445         if (rc)
2446                 return rc;
2447         spin_lock_init(&node_lock);
2448         pr_info("loaded " DRV_NAME_LONG "\n");
2449         return 0;
2450 }
2451
2452 void __exit rbd_exit(void)
2453 {
2454         rbd_sysfs_cleanup();
2455 }
2456
2457 module_init(rbd_init);
2458 module_exit(rbd_exit);
2459
2460 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2461 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2462 MODULE_DESCRIPTION("rados block device");
2463
2464 /* following authorship retained from original osdblk.c */
2465 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2466
2467 MODULE_LICENSE("GPL");