]> git.karo-electronics.de Git - linux-beck.git/blob - drivers/block/rbd.c
7392d7af7eab684cda3ffd6f52caa2c3b15cdebe
[linux-beck.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN   64
51 #define RBD_MAX_SNAP_NAME_LEN   32
52 #define RBD_MAX_OPT_LEN         1024
53
54 #define RBD_SNAP_HEAD_NAME      "-"
55
56 #define DEV_NAME_LEN            32
57
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64         u64 image_size;
65         char block_name[32];
66         __u8 obj_order;
67         __u8 crypt_type;
68         __u8 comp_type;
69         struct rw_semaphore snap_rwsem;
70         struct ceph_snap_context *snapc;
71         size_t snap_names_len;
72         u64 snap_seq;
73         u32 total_snaps;
74
75         char *snap_names;
76         u64 *snap_sizes;
77
78         u64 obj_version;
79 };
80
81 struct rbd_options {
82         int     notify_timeout;
83 };
84
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89         struct ceph_client      *client;
90         struct rbd_options      *rbd_opts;
91         struct kref             kref;
92         struct list_head        node;
93 };
94
95 struct rbd_req_coll;
96
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101         struct request          *rq;            /* blk layer request */
102         struct bio              *bio;           /* cloned bio */
103         struct page             **pages;        /* list of used pages */
104         u64                     len;
105         int                     coll_index;
106         struct rbd_req_coll     *coll;
107 };
108
109 struct rbd_req_status {
110         int done;
111         int rc;
112         u64 bytes;
113 };
114
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119         int                     total;
120         int                     num_done;
121         struct kref             kref;
122         struct rbd_req_status   status[0];
123 };
124
125 struct rbd_snap {
126         struct  device          dev;
127         const char              *name;
128         size_t                  size;
129         struct list_head        node;
130         u64                     id;
131 };
132
133 /*
134  * a single device
135  */
136 struct rbd_device {
137         int                     id;             /* blkdev unique id */
138
139         int                     major;          /* blkdev assigned major */
140         struct gendisk          *disk;          /* blkdev's gendisk and rq */
141         struct request_queue    *q;
142
143         struct ceph_client      *client;
144         struct rbd_client       *rbd_client;
145
146         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147
148         spinlock_t              lock;           /* queue lock */
149
150         struct rbd_image_header header;
151         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152         int                     obj_len;
153         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
155         int                     poolid;
156
157         struct ceph_osd_event   *watch_event;
158         struct ceph_osd_request *watch_request;
159
160         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
161         u32 cur_snap;   /* index+1 of current snapshot within snap context
162                            0 - for the head */
163         int read_only;
164
165         struct list_head        node;
166
167         /* list of snapshots */
168         struct list_head        snaps;
169
170         /* sysfs related */
171         struct device           dev;
172 };
173
174 static struct bus_type rbd_bus_type = {
175         .name           = "rbd",
176 };
177
178 static spinlock_t node_lock;      /* protects client get/put */
179
180 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list);    /* devices */
182 static LIST_HEAD(rbd_client_list);      /* clients */
183
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_rollback(struct device *dev,
187                                  struct device_attribute *attr,
188                                  const char *buf,
189                                  size_t size);
190 static ssize_t rbd_snap_add(struct device *dev,
191                             struct device_attribute *attr,
192                             const char *buf,
193                             size_t count);
194 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
195                                   struct rbd_snap *snap);;
196
197
198 static struct rbd_device *dev_to_rbd(struct device *dev)
199 {
200         return container_of(dev, struct rbd_device, dev);
201 }
202
203 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
204 {
205         return get_device(&rbd_dev->dev);
206 }
207
208 static void rbd_put_dev(struct rbd_device *rbd_dev)
209 {
210         put_device(&rbd_dev->dev);
211 }
212
213 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
214
215 static int rbd_open(struct block_device *bdev, fmode_t mode)
216 {
217         struct gendisk *disk = bdev->bd_disk;
218         struct rbd_device *rbd_dev = disk->private_data;
219
220         rbd_get_dev(rbd_dev);
221
222         set_device_ro(bdev, rbd_dev->read_only);
223
224         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
225                 return -EROFS;
226
227         return 0;
228 }
229
230 static int rbd_release(struct gendisk *disk, fmode_t mode)
231 {
232         struct rbd_device *rbd_dev = disk->private_data;
233
234         rbd_put_dev(rbd_dev);
235
236         return 0;
237 }
238
239 static const struct block_device_operations rbd_bd_ops = {
240         .owner                  = THIS_MODULE,
241         .open                   = rbd_open,
242         .release                = rbd_release,
243 };
244
245 /*
246  * Initialize an rbd client instance.
247  * We own *opt.
248  */
249 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
250                                             struct rbd_options *rbd_opts)
251 {
252         struct rbd_client *rbdc;
253         int ret = -ENOMEM;
254
255         dout("rbd_client_create\n");
256         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
257         if (!rbdc)
258                 goto out_opt;
259
260         kref_init(&rbdc->kref);
261         INIT_LIST_HEAD(&rbdc->node);
262
263         rbdc->client = ceph_create_client(opt, rbdc);
264         if (IS_ERR(rbdc->client))
265                 goto out_rbdc;
266         opt = NULL; /* Now rbdc->client is responsible for opt */
267
268         ret = ceph_open_session(rbdc->client);
269         if (ret < 0)
270                 goto out_err;
271
272         rbdc->rbd_opts = rbd_opts;
273
274         spin_lock(&node_lock);
275         list_add_tail(&rbdc->node, &rbd_client_list);
276         spin_unlock(&node_lock);
277
278         dout("rbd_client_create created %p\n", rbdc);
279         return rbdc;
280
281 out_err:
282         ceph_destroy_client(rbdc->client);
283 out_rbdc:
284         kfree(rbdc);
285 out_opt:
286         if (opt)
287                 ceph_destroy_options(opt);
288         return ERR_PTR(ret);
289 }
290
291 /*
292  * Find a ceph client with specific addr and configuration.
293  */
294 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
295 {
296         struct rbd_client *client_node;
297
298         if (opt->flags & CEPH_OPT_NOSHARE)
299                 return NULL;
300
301         list_for_each_entry(client_node, &rbd_client_list, node)
302                 if (ceph_compare_options(opt, client_node->client) == 0)
303                         return client_node;
304         return NULL;
305 }
306
307 /*
308  * mount options
309  */
310 enum {
311         Opt_notify_timeout,
312         Opt_last_int,
313         /* int args above */
314         Opt_last_string,
315         /* string args above */
316 };
317
318 static match_table_t rbdopt_tokens = {
319         {Opt_notify_timeout, "notify_timeout=%d"},
320         /* int args above */
321         /* string args above */
322         {-1, NULL}
323 };
324
325 static int parse_rbd_opts_token(char *c, void *private)
326 {
327         struct rbd_options *rbdopt = private;
328         substring_t argstr[MAX_OPT_ARGS];
329         int token, intval, ret;
330
331         token = match_token((char *)c, rbdopt_tokens, argstr);
332         if (token < 0)
333                 return -EINVAL;
334
335         if (token < Opt_last_int) {
336                 ret = match_int(&argstr[0], &intval);
337                 if (ret < 0) {
338                         pr_err("bad mount option arg (not int) "
339                                "at '%s'\n", c);
340                         return ret;
341                 }
342                 dout("got int token %d val %d\n", token, intval);
343         } else if (token > Opt_last_int && token < Opt_last_string) {
344                 dout("got string token %d val %s\n", token,
345                      argstr[0].from);
346         } else {
347                 dout("got token %d\n", token);
348         }
349
350         switch (token) {
351         case Opt_notify_timeout:
352                 rbdopt->notify_timeout = intval;
353                 break;
354         default:
355                 BUG_ON(token);
356         }
357         return 0;
358 }
359
360 /*
361  * Get a ceph client with specific addr and configuration, if one does
362  * not exist create it.
363  */
364 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
365                           char *options)
366 {
367         struct rbd_client *rbdc;
368         struct ceph_options *opt;
369         int ret;
370         struct rbd_options *rbd_opts;
371
372         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
373         if (!rbd_opts)
374                 return -ENOMEM;
375
376         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
377
378         ret = ceph_parse_options(&opt, options, mon_addr,
379                                  mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
380         if (ret < 0)
381                 goto done_err;
382
383         spin_lock(&node_lock);
384         rbdc = __rbd_client_find(opt);
385         if (rbdc) {
386                 ceph_destroy_options(opt);
387
388                 /* using an existing client */
389                 kref_get(&rbdc->kref);
390                 rbd_dev->rbd_client = rbdc;
391                 rbd_dev->client = rbdc->client;
392                 spin_unlock(&node_lock);
393                 return 0;
394         }
395         spin_unlock(&node_lock);
396
397         rbdc = rbd_client_create(opt, rbd_opts);
398         if (IS_ERR(rbdc)) {
399                 ret = PTR_ERR(rbdc);
400                 goto done_err;
401         }
402
403         rbd_dev->rbd_client = rbdc;
404         rbd_dev->client = rbdc->client;
405         return 0;
406 done_err:
407         kfree(rbd_opts);
408         return ret;
409 }
410
411 /*
412  * Destroy ceph client
413  */
414 static void rbd_client_release(struct kref *kref)
415 {
416         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
417
418         dout("rbd_release_client %p\n", rbdc);
419         spin_lock(&node_lock);
420         list_del(&rbdc->node);
421         spin_unlock(&node_lock);
422
423         ceph_destroy_client(rbdc->client);
424         kfree(rbdc->rbd_opts);
425         kfree(rbdc);
426 }
427
428 /*
429  * Drop reference to ceph client node. If it's not referenced anymore, release
430  * it.
431  */
432 static void rbd_put_client(struct rbd_device *rbd_dev)
433 {
434         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435         rbd_dev->rbd_client = NULL;
436         rbd_dev->client = NULL;
437 }
438
439 /*
440  * Destroy requests collection
441  */
442 static void rbd_coll_release(struct kref *kref)
443 {
444         struct rbd_req_coll *coll =
445                 container_of(kref, struct rbd_req_coll, kref);
446
447         dout("rbd_coll_release %p\n", coll);
448         kfree(coll);
449 }
450
451 /*
452  * Create a new header structure, translate header format from the on-disk
453  * header.
454  */
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456                                  struct rbd_image_header_ondisk *ondisk,
457                                  int allocated_snaps,
458                                  gfp_t gfp_flags)
459 {
460         int i;
461         u32 snap_count = le32_to_cpu(ondisk->snap_count);
462         int ret = -ENOMEM;
463
464         init_rwsem(&header->snap_rwsem);
465         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467                                 snap_count *
468                                  sizeof(struct rbd_image_snap_ondisk),
469                                 gfp_flags);
470         if (!header->snapc)
471                 return -ENOMEM;
472         if (snap_count) {
473                 header->snap_names = kmalloc(header->snap_names_len,
474                                              GFP_KERNEL);
475                 if (!header->snap_names)
476                         goto err_snapc;
477                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478                                              GFP_KERNEL);
479                 if (!header->snap_sizes)
480                         goto err_names;
481         } else {
482                 header->snap_names = NULL;
483                 header->snap_sizes = NULL;
484         }
485         memcpy(header->block_name, ondisk->block_name,
486                sizeof(ondisk->block_name));
487
488         header->image_size = le64_to_cpu(ondisk->image_size);
489         header->obj_order = ondisk->options.order;
490         header->crypt_type = ondisk->options.crypt_type;
491         header->comp_type = ondisk->options.comp_type;
492
493         atomic_set(&header->snapc->nref, 1);
494         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495         header->snapc->num_snaps = snap_count;
496         header->total_snaps = snap_count;
497
498         if (snap_count &&
499             allocated_snaps == snap_count) {
500                 for (i = 0; i < snap_count; i++) {
501                         header->snapc->snaps[i] =
502                                 le64_to_cpu(ondisk->snaps[i].id);
503                         header->snap_sizes[i] =
504                                 le64_to_cpu(ondisk->snaps[i].image_size);
505                 }
506
507                 /* copy snapshot names */
508                 memcpy(header->snap_names, &ondisk->snaps[i],
509                         header->snap_names_len);
510         }
511
512         return 0;
513
514 err_names:
515         kfree(header->snap_names);
516 err_snapc:
517         kfree(header->snapc);
518         return ret;
519 }
520
521 static int snap_index(struct rbd_image_header *header, int snap_num)
522 {
523         return header->total_snaps - snap_num;
524 }
525
526 static u64 cur_snap_id(struct rbd_device *rbd_dev)
527 {
528         struct rbd_image_header *header = &rbd_dev->header;
529
530         if (!rbd_dev->cur_snap)
531                 return 0;
532
533         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
534 }
535
536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537                         u64 *seq, u64 *size)
538 {
539         int i;
540         char *p = header->snap_names;
541
542         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543                 if (strcmp(snap_name, p) == 0)
544                         break;
545         }
546         if (i == header->total_snaps)
547                 return -ENOENT;
548         if (seq)
549                 *seq = header->snapc->snaps[i];
550
551         if (size)
552                 *size = header->snap_sizes[i];
553
554         return i;
555 }
556
557 static int rbd_header_set_snap(struct rbd_device *dev,
558                                const char *snap_name,
559                                u64 *size)
560 {
561         struct rbd_image_header *header = &dev->header;
562         struct ceph_snap_context *snapc = header->snapc;
563         int ret = -ENOENT;
564
565         down_write(&header->snap_rwsem);
566
567         if (!snap_name ||
568             !*snap_name ||
569             strcmp(snap_name, "-") == 0 ||
570             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571                 if (header->total_snaps)
572                         snapc->seq = header->snap_seq;
573                 else
574                         snapc->seq = 0;
575                 dev->cur_snap = 0;
576                 dev->read_only = 0;
577                 if (size)
578                         *size = header->image_size;
579         } else {
580                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
581                 if (ret < 0)
582                         goto done;
583
584                 dev->cur_snap = header->total_snaps - ret;
585                 dev->read_only = 1;
586         }
587
588         ret = 0;
589 done:
590         up_write(&header->snap_rwsem);
591         return ret;
592 }
593
594 static void rbd_header_free(struct rbd_image_header *header)
595 {
596         kfree(header->snapc);
597         kfree(header->snap_names);
598         kfree(header->snap_sizes);
599 }
600
601 /*
602  * get the actual striped segment name, offset and length
603  */
604 static u64 rbd_get_segment(struct rbd_image_header *header,
605                            const char *block_name,
606                            u64 ofs, u64 len,
607                            char *seg_name, u64 *segofs)
608 {
609         u64 seg = ofs >> header->obj_order;
610
611         if (seg_name)
612                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613                          "%s.%012llx", block_name, seg);
614
615         ofs = ofs & ((1 << header->obj_order) - 1);
616         len = min_t(u64, len, (1 << header->obj_order) - ofs);
617
618         if (segofs)
619                 *segofs = ofs;
620
621         return len;
622 }
623
624 static int rbd_get_num_segments(struct rbd_image_header *header,
625                                 u64 ofs, u64 len)
626 {
627         u64 start_seg = ofs >> header->obj_order;
628         u64 end_seg = (ofs + len - 1) >> header->obj_order;
629         return end_seg - start_seg + 1;
630 }
631
632 /*
633  * bio helpers
634  */
635
636 static void bio_chain_put(struct bio *chain)
637 {
638         struct bio *tmp;
639
640         while (chain) {
641                 tmp = chain;
642                 chain = chain->bi_next;
643                 bio_put(tmp);
644         }
645 }
646
647 /*
648  * zeros a bio chain, starting at specific offset
649  */
650 static void zero_bio_chain(struct bio *chain, int start_ofs)
651 {
652         struct bio_vec *bv;
653         unsigned long flags;
654         void *buf;
655         int i;
656         int pos = 0;
657
658         while (chain) {
659                 bio_for_each_segment(bv, chain, i) {
660                         if (pos + bv->bv_len > start_ofs) {
661                                 int remainder = max(start_ofs - pos, 0);
662                                 buf = bvec_kmap_irq(bv, &flags);
663                                 memset(buf + remainder, 0,
664                                        bv->bv_len - remainder);
665                                 bvec_kunmap_irq(buf, &flags);
666                         }
667                         pos += bv->bv_len;
668                 }
669
670                 chain = chain->bi_next;
671         }
672 }
673
674 /*
675  * bio_chain_clone - clone a chain of bios up to a certain length.
676  * might return a bio_pair that will need to be released.
677  */
678 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
679                                    struct bio_pair **bp,
680                                    int len, gfp_t gfpmask)
681 {
682         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
683         int total = 0;
684
685         if (*bp) {
686                 bio_pair_release(*bp);
687                 *bp = NULL;
688         }
689
690         while (old_chain && (total < len)) {
691                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
692                 if (!tmp)
693                         goto err_out;
694
695                 if (total + old_chain->bi_size > len) {
696                         struct bio_pair *bp;
697
698                         /*
699                          * this split can only happen with a single paged bio,
700                          * split_bio will BUG_ON if this is not the case
701                          */
702                         dout("bio_chain_clone split! total=%d remaining=%d"
703                              "bi_size=%d\n",
704                              (int)total, (int)len-total,
705                              (int)old_chain->bi_size);
706
707                         /* split the bio. We'll release it either in the next
708                            call, or it will have to be released outside */
709                         bp = bio_split(old_chain, (len - total) / 512ULL);
710                         if (!bp)
711                                 goto err_out;
712
713                         __bio_clone(tmp, &bp->bio1);
714
715                         *next = &bp->bio2;
716                 } else {
717                         __bio_clone(tmp, old_chain);
718                         *next = old_chain->bi_next;
719                 }
720
721                 tmp->bi_bdev = NULL;
722                 gfpmask &= ~__GFP_WAIT;
723                 tmp->bi_next = NULL;
724
725                 if (!new_chain) {
726                         new_chain = tail = tmp;
727                 } else {
728                         tail->bi_next = tmp;
729                         tail = tmp;
730                 }
731                 old_chain = old_chain->bi_next;
732
733                 total += tmp->bi_size;
734         }
735
736         BUG_ON(total < len);
737
738         if (tail)
739                 tail->bi_next = NULL;
740
741         *old = old_chain;
742
743         return new_chain;
744
745 err_out:
746         dout("bio_chain_clone with err\n");
747         bio_chain_put(new_chain);
748         return NULL;
749 }
750
751 /*
752  * helpers for osd request op vectors.
753  */
754 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
755                             int num_ops,
756                             int opcode,
757                             u32 payload_len)
758 {
759         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
760                        GFP_NOIO);
761         if (!*ops)
762                 return -ENOMEM;
763         (*ops)[0].op = opcode;
764         /*
765          * op extent offset and length will be set later on
766          * in calc_raw_layout()
767          */
768         (*ops)[0].payload_len = payload_len;
769         return 0;
770 }
771
772 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
773 {
774         kfree(ops);
775 }
776
777 static void rbd_coll_end_req_index(struct request *rq,
778                                    struct rbd_req_coll *coll,
779                                    int index,
780                                    int ret, u64 len)
781 {
782         struct request_queue *q;
783         int min, max, i;
784
785         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786              coll, index, ret, len);
787
788         if (!rq)
789                 return;
790
791         if (!coll) {
792                 blk_end_request(rq, ret, len);
793                 return;
794         }
795
796         q = rq->q;
797
798         spin_lock_irq(q->queue_lock);
799         coll->status[index].done = 1;
800         coll->status[index].rc = ret;
801         coll->status[index].bytes = len;
802         max = min = coll->num_done;
803         while (max < coll->total && coll->status[max].done)
804                 max++;
805
806         for (i = min; i<max; i++) {
807                 __blk_end_request(rq, coll->status[i].rc,
808                                   coll->status[i].bytes);
809                 coll->num_done++;
810                 kref_put(&coll->kref, rbd_coll_release);
811         }
812         spin_unlock_irq(q->queue_lock);
813 }
814
815 static void rbd_coll_end_req(struct rbd_request *req,
816                              int ret, u64 len)
817 {
818         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
819 }
820
821 /*
822  * Send ceph osd request
823  */
824 static int rbd_do_request(struct request *rq,
825                           struct rbd_device *dev,
826                           struct ceph_snap_context *snapc,
827                           u64 snapid,
828                           const char *obj, u64 ofs, u64 len,
829                           struct bio *bio,
830                           struct page **pages,
831                           int num_pages,
832                           int flags,
833                           struct ceph_osd_req_op *ops,
834                           int num_reply,
835                           struct rbd_req_coll *coll,
836                           int coll_index,
837                           void (*rbd_cb)(struct ceph_osd_request *req,
838                                          struct ceph_msg *msg),
839                           struct ceph_osd_request **linger_req,
840                           u64 *ver)
841 {
842         struct ceph_osd_request *req;
843         struct ceph_file_layout *layout;
844         int ret;
845         u64 bno;
846         struct timespec mtime = CURRENT_TIME;
847         struct rbd_request *req_data;
848         struct ceph_osd_request_head *reqhead;
849         struct rbd_image_header *header = &dev->header;
850
851         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
852         if (!req_data) {
853                 if (coll)
854                         rbd_coll_end_req_index(rq, coll, coll_index,
855                                                -ENOMEM, len);
856                 return -ENOMEM;
857         }
858
859         if (coll) {
860                 req_data->coll = coll;
861                 req_data->coll_index = coll_index;
862         }
863
864         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
865
866         down_read(&header->snap_rwsem);
867
868         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
869                                       snapc,
870                                       ops,
871                                       false,
872                                       GFP_NOIO, pages, bio);
873         if (!req) {
874                 up_read(&header->snap_rwsem);
875                 ret = -ENOMEM;
876                 goto done_pages;
877         }
878
879         req->r_callback = rbd_cb;
880
881         req_data->rq = rq;
882         req_data->bio = bio;
883         req_data->pages = pages;
884         req_data->len = len;
885
886         req->r_priv = req_data;
887
888         reqhead = req->r_request->front.iov_base;
889         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
890
891         strncpy(req->r_oid, obj, sizeof(req->r_oid));
892         req->r_oid_len = strlen(req->r_oid);
893
894         layout = &req->r_file_layout;
895         memset(layout, 0, sizeof(*layout));
896         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
897         layout->fl_stripe_count = cpu_to_le32(1);
898         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
899         layout->fl_pg_preferred = cpu_to_le32(-1);
900         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
901         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
902                              ofs, &len, &bno, req, ops);
903
904         ceph_osdc_build_request(req, ofs, &len,
905                                 ops,
906                                 snapc,
907                                 &mtime,
908                                 req->r_oid, req->r_oid_len);
909         up_read(&header->snap_rwsem);
910
911         if (linger_req) {
912                 ceph_osdc_set_request_linger(&dev->client->osdc, req);
913                 *linger_req = req;
914         }
915
916         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
917         if (ret < 0)
918                 goto done_err;
919
920         if (!rbd_cb) {
921                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
922                 if (ver)
923                         *ver = le64_to_cpu(req->r_reassert_version.version);
924                 dout("reassert_ver=%lld\n",
925                      le64_to_cpu(req->r_reassert_version.version));
926                 ceph_osdc_put_request(req);
927         }
928         return ret;
929
930 done_err:
931         bio_chain_put(req_data->bio);
932         ceph_osdc_put_request(req);
933 done_pages:
934         rbd_coll_end_req(req_data, ret, len);
935         kfree(req_data);
936         return ret;
937 }
938
939 /*
940  * Ceph osd op callback
941  */
942 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
943 {
944         struct rbd_request *req_data = req->r_priv;
945         struct ceph_osd_reply_head *replyhead;
946         struct ceph_osd_op *op;
947         __s32 rc;
948         u64 bytes;
949         int read_op;
950
951         /* parse reply */
952         replyhead = msg->front.iov_base;
953         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
954         op = (void *)(replyhead + 1);
955         rc = le32_to_cpu(replyhead->result);
956         bytes = le64_to_cpu(op->extent.length);
957         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
958
959         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
960
961         if (rc == -ENOENT && read_op) {
962                 zero_bio_chain(req_data->bio, 0);
963                 rc = 0;
964         } else if (rc == 0 && read_op && bytes < req_data->len) {
965                 zero_bio_chain(req_data->bio, bytes);
966                 bytes = req_data->len;
967         }
968
969         rbd_coll_end_req(req_data, rc, bytes);
970
971         if (req_data->bio)
972                 bio_chain_put(req_data->bio);
973
974         ceph_osdc_put_request(req);
975         kfree(req_data);
976 }
977
978 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
979 {
980         ceph_osdc_put_request(req);
981 }
982
983 /*
984  * Do a synchronous ceph osd operation
985  */
986 static int rbd_req_sync_op(struct rbd_device *dev,
987                            struct ceph_snap_context *snapc,
988                            u64 snapid,
989                            int opcode,
990                            int flags,
991                            struct ceph_osd_req_op *orig_ops,
992                            int num_reply,
993                            const char *obj,
994                            u64 ofs, u64 len,
995                            char *buf,
996                            struct ceph_osd_request **linger_req,
997                            u64 *ver)
998 {
999         int ret;
1000         struct page **pages;
1001         int num_pages;
1002         struct ceph_osd_req_op *ops = orig_ops;
1003         u32 payload_len;
1004
1005         num_pages = calc_pages_for(ofs , len);
1006         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1007         if (IS_ERR(pages))
1008                 return PTR_ERR(pages);
1009
1010         if (!orig_ops) {
1011                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1012                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1013                 if (ret < 0)
1014                         goto done;
1015
1016                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1017                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1018                         if (ret < 0)
1019                                 goto done_ops;
1020                 }
1021         }
1022
1023         ret = rbd_do_request(NULL, dev, snapc, snapid,
1024                           obj, ofs, len, NULL,
1025                           pages, num_pages,
1026                           flags,
1027                           ops,
1028                           2,
1029                           NULL, 0,
1030                           NULL,
1031                           linger_req, ver);
1032         if (ret < 0)
1033                 goto done_ops;
1034
1035         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1036                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1037
1038 done_ops:
1039         if (!orig_ops)
1040                 rbd_destroy_ops(ops);
1041 done:
1042         ceph_release_page_vector(pages, num_pages);
1043         return ret;
1044 }
1045
1046 /*
1047  * Do an asynchronous ceph osd operation
1048  */
1049 static int rbd_do_op(struct request *rq,
1050                      struct rbd_device *rbd_dev ,
1051                      struct ceph_snap_context *snapc,
1052                      u64 snapid,
1053                      int opcode, int flags, int num_reply,
1054                      u64 ofs, u64 len,
1055                      struct bio *bio,
1056                      struct rbd_req_coll *coll,
1057                      int coll_index)
1058 {
1059         char *seg_name;
1060         u64 seg_ofs;
1061         u64 seg_len;
1062         int ret;
1063         struct ceph_osd_req_op *ops;
1064         u32 payload_len;
1065
1066         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1067         if (!seg_name)
1068                 return -ENOMEM;
1069
1070         seg_len = rbd_get_segment(&rbd_dev->header,
1071                                   rbd_dev->header.block_name,
1072                                   ofs, len,
1073                                   seg_name, &seg_ofs);
1074
1075         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1076
1077         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1078         if (ret < 0)
1079                 goto done;
1080
1081         /* we've taken care of segment sizes earlier when we
1082            cloned the bios. We should never have a segment
1083            truncated at this point */
1084         BUG_ON(seg_len < len);
1085
1086         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1087                              seg_name, seg_ofs, seg_len,
1088                              bio,
1089                              NULL, 0,
1090                              flags,
1091                              ops,
1092                              num_reply,
1093                              coll, coll_index,
1094                              rbd_req_cb, 0, NULL);
1095
1096         rbd_destroy_ops(ops);
1097 done:
1098         kfree(seg_name);
1099         return ret;
1100 }
1101
1102 /*
1103  * Request async osd write
1104  */
1105 static int rbd_req_write(struct request *rq,
1106                          struct rbd_device *rbd_dev,
1107                          struct ceph_snap_context *snapc,
1108                          u64 ofs, u64 len,
1109                          struct bio *bio,
1110                          struct rbd_req_coll *coll,
1111                          int coll_index)
1112 {
1113         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1114                          CEPH_OSD_OP_WRITE,
1115                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1116                          2,
1117                          ofs, len, bio, coll, coll_index);
1118 }
1119
1120 /*
1121  * Request async osd read
1122  */
1123 static int rbd_req_read(struct request *rq,
1124                          struct rbd_device *rbd_dev,
1125                          u64 snapid,
1126                          u64 ofs, u64 len,
1127                          struct bio *bio,
1128                          struct rbd_req_coll *coll,
1129                          int coll_index)
1130 {
1131         return rbd_do_op(rq, rbd_dev, NULL,
1132                          (snapid ? snapid : CEPH_NOSNAP),
1133                          CEPH_OSD_OP_READ,
1134                          CEPH_OSD_FLAG_READ,
1135                          2,
1136                          ofs, len, bio, coll, coll_index);
1137 }
1138
1139 /*
1140  * Request sync osd read
1141  */
1142 static int rbd_req_sync_read(struct rbd_device *dev,
1143                           struct ceph_snap_context *snapc,
1144                           u64 snapid,
1145                           const char *obj,
1146                           u64 ofs, u64 len,
1147                           char *buf,
1148                           u64 *ver)
1149 {
1150         return rbd_req_sync_op(dev, NULL,
1151                                (snapid ? snapid : CEPH_NOSNAP),
1152                                CEPH_OSD_OP_READ,
1153                                CEPH_OSD_FLAG_READ,
1154                                NULL,
1155                                1, obj, ofs, len, buf, NULL, ver);
1156 }
1157
1158 /*
1159  * Request sync osd watch
1160  */
1161 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1162                                    u64 ver,
1163                                    u64 notify_id,
1164                                    const char *obj)
1165 {
1166         struct ceph_osd_req_op *ops;
1167         struct page **pages = NULL;
1168         int ret;
1169
1170         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1171         if (ret < 0)
1172                 return ret;
1173
1174         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1175         ops[0].watch.cookie = notify_id;
1176         ops[0].watch.flag = 0;
1177
1178         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1179                           obj, 0, 0, NULL,
1180                           pages, 0,
1181                           CEPH_OSD_FLAG_READ,
1182                           ops,
1183                           1,
1184                           NULL, 0,
1185                           rbd_simple_req_cb, 0, NULL);
1186
1187         rbd_destroy_ops(ops);
1188         return ret;
1189 }
1190
1191 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1192 {
1193         struct rbd_device *dev = (struct rbd_device *)data;
1194         int rc;
1195
1196         if (!dev)
1197                 return;
1198
1199         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1200                 notify_id, (int)opcode);
1201         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1202         rc = __rbd_update_snaps(dev);
1203         mutex_unlock(&ctl_mutex);
1204         if (rc)
1205                 pr_warning(DRV_NAME "%d got notification but failed to update"
1206                            " snaps: %d\n", dev->major, rc);
1207
1208         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1209 }
1210
1211 /*
1212  * Request sync osd watch
1213  */
1214 static int rbd_req_sync_watch(struct rbd_device *dev,
1215                               const char *obj,
1216                               u64 ver)
1217 {
1218         struct ceph_osd_req_op *ops;
1219         struct ceph_osd_client *osdc = &dev->client->osdc;
1220
1221         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1222         if (ret < 0)
1223                 return ret;
1224
1225         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1226                                      (void *)dev, &dev->watch_event);
1227         if (ret < 0)
1228                 goto fail;
1229
1230         ops[0].watch.ver = cpu_to_le64(ver);
1231         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1232         ops[0].watch.flag = 1;
1233
1234         ret = rbd_req_sync_op(dev, NULL,
1235                               CEPH_NOSNAP,
1236                               0,
1237                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1238                               ops,
1239                               1, obj, 0, 0, NULL,
1240                               &dev->watch_request, NULL);
1241
1242         if (ret < 0)
1243                 goto fail_event;
1244
1245         rbd_destroy_ops(ops);
1246         return 0;
1247
1248 fail_event:
1249         ceph_osdc_cancel_event(dev->watch_event);
1250         dev->watch_event = NULL;
1251 fail:
1252         rbd_destroy_ops(ops);
1253         return ret;
1254 }
1255
1256 /*
1257  * Request sync osd unwatch
1258  */
1259 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1260                                 const char *obj)
1261 {
1262         struct ceph_osd_req_op *ops;
1263
1264         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1265         if (ret < 0)
1266                 return ret;
1267
1268         ops[0].watch.ver = 0;
1269         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1270         ops[0].watch.flag = 0;
1271
1272         ret = rbd_req_sync_op(dev, NULL,
1273                               CEPH_NOSNAP,
1274                               0,
1275                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1276                               ops,
1277                               1, obj, 0, 0, NULL, NULL, NULL);
1278
1279         rbd_destroy_ops(ops);
1280         ceph_osdc_cancel_event(dev->watch_event);
1281         dev->watch_event = NULL;
1282         return ret;
1283 }
1284
1285 struct rbd_notify_info {
1286         struct rbd_device *dev;
1287 };
1288
1289 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1290 {
1291         struct rbd_device *dev = (struct rbd_device *)data;
1292         if (!dev)
1293                 return;
1294
1295         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1296                 notify_id, (int)opcode);
1297 }
1298
1299 /*
1300  * Request sync osd notify
1301  */
1302 static int rbd_req_sync_notify(struct rbd_device *dev,
1303                           const char *obj)
1304 {
1305         struct ceph_osd_req_op *ops;
1306         struct ceph_osd_client *osdc = &dev->client->osdc;
1307         struct ceph_osd_event *event;
1308         struct rbd_notify_info info;
1309         int payload_len = sizeof(u32) + sizeof(u32);
1310         int ret;
1311
1312         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1313         if (ret < 0)
1314                 return ret;
1315
1316         info.dev = dev;
1317
1318         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1319                                      (void *)&info, &event);
1320         if (ret < 0)
1321                 goto fail;
1322
1323         ops[0].watch.ver = 1;
1324         ops[0].watch.flag = 1;
1325         ops[0].watch.cookie = event->cookie;
1326         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1327         ops[0].watch.timeout = 12;
1328
1329         ret = rbd_req_sync_op(dev, NULL,
1330                                CEPH_NOSNAP,
1331                                0,
1332                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1333                                ops,
1334                                1, obj, 0, 0, NULL, NULL, NULL);
1335         if (ret < 0)
1336                 goto fail_event;
1337
1338         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1339         dout("ceph_osdc_wait_event returned %d\n", ret);
1340         rbd_destroy_ops(ops);
1341         return 0;
1342
1343 fail_event:
1344         ceph_osdc_cancel_event(event);
1345 fail:
1346         rbd_destroy_ops(ops);
1347         return ret;
1348 }
1349
1350 /*
1351  * Request sync osd rollback
1352  */
1353 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1354                                      u64 snapid,
1355                                      const char *obj)
1356 {
1357         struct ceph_osd_req_op *ops;
1358         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1359         if (ret < 0)
1360                 return ret;
1361
1362         ops[0].snap.snapid = snapid;
1363
1364         ret = rbd_req_sync_op(dev, NULL,
1365                                CEPH_NOSNAP,
1366                                0,
1367                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1368                                ops,
1369                                1, obj, 0, 0, NULL, NULL, NULL);
1370
1371         rbd_destroy_ops(ops);
1372
1373         return ret;
1374 }
1375
1376 /*
1377  * Request sync osd read
1378  */
1379 static int rbd_req_sync_exec(struct rbd_device *dev,
1380                              const char *obj,
1381                              const char *cls,
1382                              const char *method,
1383                              const char *data,
1384                              int len,
1385                              u64 *ver)
1386 {
1387         struct ceph_osd_req_op *ops;
1388         int cls_len = strlen(cls);
1389         int method_len = strlen(method);
1390         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1391                                     cls_len + method_len + len);
1392         if (ret < 0)
1393                 return ret;
1394
1395         ops[0].cls.class_name = cls;
1396         ops[0].cls.class_len = (__u8)cls_len;
1397         ops[0].cls.method_name = method;
1398         ops[0].cls.method_len = (__u8)method_len;
1399         ops[0].cls.argc = 0;
1400         ops[0].cls.indata = data;
1401         ops[0].cls.indata_len = len;
1402
1403         ret = rbd_req_sync_op(dev, NULL,
1404                                CEPH_NOSNAP,
1405                                0,
1406                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1407                                ops,
1408                                1, obj, 0, 0, NULL, NULL, ver);
1409
1410         rbd_destroy_ops(ops);
1411
1412         dout("cls_exec returned %d\n", ret);
1413         return ret;
1414 }
1415
1416 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1417 {
1418         struct rbd_req_coll *coll =
1419                         kzalloc(sizeof(struct rbd_req_coll) +
1420                                 sizeof(struct rbd_req_status) * num_reqs,
1421                                 GFP_ATOMIC);
1422
1423         if (!coll)
1424                 return NULL;
1425         coll->total = num_reqs;
1426         kref_init(&coll->kref);
1427         return coll;
1428 }
1429
1430 /*
1431  * block device queue callback
1432  */
1433 static void rbd_rq_fn(struct request_queue *q)
1434 {
1435         struct rbd_device *rbd_dev = q->queuedata;
1436         struct request *rq;
1437         struct bio_pair *bp = NULL;
1438
1439         rq = blk_fetch_request(q);
1440
1441         while (1) {
1442                 struct bio *bio;
1443                 struct bio *rq_bio, *next_bio = NULL;
1444                 bool do_write;
1445                 int size, op_size = 0;
1446                 u64 ofs;
1447                 int num_segs, cur_seg = 0;
1448                 struct rbd_req_coll *coll;
1449
1450                 /* peek at request from block layer */
1451                 if (!rq)
1452                         break;
1453
1454                 dout("fetched request\n");
1455
1456                 /* filter out block requests we don't understand */
1457                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1458                         __blk_end_request_all(rq, 0);
1459                         goto next;
1460                 }
1461
1462                 /* deduce our operation (read, write) */
1463                 do_write = (rq_data_dir(rq) == WRITE);
1464
1465                 size = blk_rq_bytes(rq);
1466                 ofs = blk_rq_pos(rq) * 512ULL;
1467                 rq_bio = rq->bio;
1468                 if (do_write && rbd_dev->read_only) {
1469                         __blk_end_request_all(rq, -EROFS);
1470                         goto next;
1471                 }
1472
1473                 spin_unlock_irq(q->queue_lock);
1474
1475                 dout("%s 0x%x bytes at 0x%llx\n",
1476                      do_write ? "write" : "read",
1477                      size, blk_rq_pos(rq) * 512ULL);
1478
1479                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1480                 coll = rbd_alloc_coll(num_segs);
1481                 if (!coll) {
1482                         spin_lock_irq(q->queue_lock);
1483                         __blk_end_request_all(rq, -ENOMEM);
1484                         goto next;
1485                 }
1486
1487                 do {
1488                         /* a bio clone to be passed down to OSD req */
1489                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1490                         op_size = rbd_get_segment(&rbd_dev->header,
1491                                                   rbd_dev->header.block_name,
1492                                                   ofs, size,
1493                                                   NULL, NULL);
1494                         kref_get(&coll->kref);
1495                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1496                                               op_size, GFP_ATOMIC);
1497                         if (!bio) {
1498                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1499                                                        -ENOMEM, op_size);
1500                                 goto next_seg;
1501                         }
1502
1503
1504                         /* init OSD command: write or read */
1505                         if (do_write)
1506                                 rbd_req_write(rq, rbd_dev,
1507                                               rbd_dev->header.snapc,
1508                                               ofs,
1509                                               op_size, bio,
1510                                               coll, cur_seg);
1511                         else
1512                                 rbd_req_read(rq, rbd_dev,
1513                                              cur_snap_id(rbd_dev),
1514                                              ofs,
1515                                              op_size, bio,
1516                                              coll, cur_seg);
1517
1518 next_seg:
1519                         size -= op_size;
1520                         ofs += op_size;
1521
1522                         cur_seg++;
1523                         rq_bio = next_bio;
1524                 } while (size > 0);
1525                 kref_put(&coll->kref, rbd_coll_release);
1526
1527                 if (bp)
1528                         bio_pair_release(bp);
1529                 spin_lock_irq(q->queue_lock);
1530 next:
1531                 rq = blk_fetch_request(q);
1532         }
1533 }
1534
1535 /*
1536  * a queue callback. Makes sure that we don't create a bio that spans across
1537  * multiple osd objects. One exception would be with a single page bios,
1538  * which we handle later at bio_chain_clone
1539  */
1540 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1541                           struct bio_vec *bvec)
1542 {
1543         struct rbd_device *rbd_dev = q->queuedata;
1544         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1545         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1546         unsigned int bio_sectors = bmd->bi_size >> 9;
1547         int max;
1548
1549         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1550                                  + bio_sectors)) << 9;
1551         if (max < 0)
1552                 max = 0; /* bio_add cannot handle a negative return */
1553         if (max <= bvec->bv_len && bio_sectors == 0)
1554                 return bvec->bv_len;
1555         return max;
1556 }
1557
1558 static void rbd_free_disk(struct rbd_device *rbd_dev)
1559 {
1560         struct gendisk *disk = rbd_dev->disk;
1561
1562         if (!disk)
1563                 return;
1564
1565         rbd_header_free(&rbd_dev->header);
1566
1567         if (disk->flags & GENHD_FL_UP)
1568                 del_gendisk(disk);
1569         if (disk->queue)
1570                 blk_cleanup_queue(disk->queue);
1571         put_disk(disk);
1572 }
1573
1574 /*
1575  * reload the ondisk the header 
1576  */
1577 static int rbd_read_header(struct rbd_device *rbd_dev,
1578                            struct rbd_image_header *header)
1579 {
1580         ssize_t rc;
1581         struct rbd_image_header_ondisk *dh;
1582         int snap_count = 0;
1583         u64 snap_names_len = 0;
1584         u64 ver;
1585
1586         while (1) {
1587                 int len = sizeof(*dh) +
1588                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1589                           snap_names_len;
1590
1591                 rc = -ENOMEM;
1592                 dh = kmalloc(len, GFP_KERNEL);
1593                 if (!dh)
1594                         return -ENOMEM;
1595
1596                 rc = rbd_req_sync_read(rbd_dev,
1597                                        NULL, CEPH_NOSNAP,
1598                                        rbd_dev->obj_md_name,
1599                                        0, len,
1600                                        (char *)dh, &ver);
1601                 if (rc < 0)
1602                         goto out_dh;
1603
1604                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1605                 if (rc < 0)
1606                         goto out_dh;
1607
1608                 if (snap_count != header->total_snaps) {
1609                         snap_count = header->total_snaps;
1610                         snap_names_len = header->snap_names_len;
1611                         rbd_header_free(header);
1612                         kfree(dh);
1613                         continue;
1614                 }
1615                 break;
1616         }
1617         header->obj_version = ver;
1618
1619 out_dh:
1620         kfree(dh);
1621         return rc;
1622 }
1623
1624 /*
1625  * create a snapshot
1626  */
1627 static int rbd_header_add_snap(struct rbd_device *dev,
1628                                const char *snap_name,
1629                                gfp_t gfp_flags)
1630 {
1631         int name_len = strlen(snap_name);
1632         u64 new_snapid;
1633         int ret;
1634         void *data, *p, *e;
1635         u64 ver;
1636
1637         /* we should create a snapshot only if we're pointing at the head */
1638         if (dev->cur_snap)
1639                 return -EINVAL;
1640
1641         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1642                                       &new_snapid);
1643         dout("created snapid=%lld\n", new_snapid);
1644         if (ret < 0)
1645                 return ret;
1646
1647         data = kmalloc(name_len + 16, gfp_flags);
1648         if (!data)
1649                 return -ENOMEM;
1650
1651         p = data;
1652         e = data + name_len + 16;
1653
1654         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1655         ceph_encode_64_safe(&p, e, new_snapid, bad);
1656
1657         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1658                                 data, p - data, &ver);
1659
1660         kfree(data);
1661
1662         if (ret < 0)
1663                 return ret;
1664
1665         dev->header.snapc->seq =  new_snapid;
1666
1667         return 0;
1668 bad:
1669         return -ERANGE;
1670 }
1671
1672 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1673 {
1674         struct rbd_snap *snap;
1675
1676         while (!list_empty(&rbd_dev->snaps)) {
1677                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1678                 __rbd_remove_snap_dev(rbd_dev, snap);
1679         }
1680 }
1681
1682 /*
1683  * only read the first part of the ondisk header, without the snaps info
1684  */
1685 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1686 {
1687         int ret;
1688         struct rbd_image_header h;
1689         u64 snap_seq;
1690         int follow_seq = 0;
1691
1692         ret = rbd_read_header(rbd_dev, &h);
1693         if (ret < 0)
1694                 return ret;
1695
1696         /* resized? */
1697         set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1698
1699         down_write(&rbd_dev->header.snap_rwsem);
1700
1701         snap_seq = rbd_dev->header.snapc->seq;
1702         if (rbd_dev->header.total_snaps &&
1703             rbd_dev->header.snapc->snaps[0] == snap_seq)
1704                 /* pointing at the head, will need to follow that
1705                    if head moves */
1706                 follow_seq = 1;
1707
1708         kfree(rbd_dev->header.snapc);
1709         kfree(rbd_dev->header.snap_names);
1710         kfree(rbd_dev->header.snap_sizes);
1711
1712         rbd_dev->header.total_snaps = h.total_snaps;
1713         rbd_dev->header.snapc = h.snapc;
1714         rbd_dev->header.snap_names = h.snap_names;
1715         rbd_dev->header.snap_names_len = h.snap_names_len;
1716         rbd_dev->header.snap_sizes = h.snap_sizes;
1717         if (follow_seq)
1718                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1719         else
1720                 rbd_dev->header.snapc->seq = snap_seq;
1721
1722         ret = __rbd_init_snaps_header(rbd_dev);
1723
1724         up_write(&rbd_dev->header.snap_rwsem);
1725
1726         return ret;
1727 }
1728
1729 static int rbd_init_disk(struct rbd_device *rbd_dev)
1730 {
1731         struct gendisk *disk;
1732         struct request_queue *q;
1733         int rc;
1734         u64 total_size = 0;
1735
1736         /* contact OSD, request size info about the object being mapped */
1737         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1738         if (rc)
1739                 return rc;
1740
1741         /* no need to lock here, as rbd_dev is not registered yet */
1742         rc = __rbd_init_snaps_header(rbd_dev);
1743         if (rc)
1744                 return rc;
1745
1746         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1747         if (rc)
1748                 return rc;
1749
1750         /* create gendisk info */
1751         rc = -ENOMEM;
1752         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1753         if (!disk)
1754                 goto out;
1755
1756         snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1757                  rbd_dev->id);
1758         disk->major = rbd_dev->major;
1759         disk->first_minor = 0;
1760         disk->fops = &rbd_bd_ops;
1761         disk->private_data = rbd_dev;
1762
1763         /* init rq */
1764         rc = -ENOMEM;
1765         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1766         if (!q)
1767                 goto out_disk;
1768         blk_queue_merge_bvec(q, rbd_merge_bvec);
1769         disk->queue = q;
1770
1771         q->queuedata = rbd_dev;
1772
1773         rbd_dev->disk = disk;
1774         rbd_dev->q = q;
1775
1776         /* finally, announce the disk to the world */
1777         set_capacity(disk, total_size / 512ULL);
1778         add_disk(disk);
1779
1780         pr_info("%s: added with size 0x%llx\n",
1781                 disk->disk_name, (unsigned long long)total_size);
1782         return 0;
1783
1784 out_disk:
1785         put_disk(disk);
1786 out:
1787         return rc;
1788 }
1789
1790 /*
1791   sysfs
1792 */
1793
1794 static ssize_t rbd_size_show(struct device *dev,
1795                              struct device_attribute *attr, char *buf)
1796 {
1797         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1798
1799         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1800 }
1801
1802 static ssize_t rbd_major_show(struct device *dev,
1803                               struct device_attribute *attr, char *buf)
1804 {
1805         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1806
1807         return sprintf(buf, "%d\n", rbd_dev->major);
1808 }
1809
1810 static ssize_t rbd_client_id_show(struct device *dev,
1811                                   struct device_attribute *attr, char *buf)
1812 {
1813         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1814
1815         return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1816 }
1817
1818 static ssize_t rbd_pool_show(struct device *dev,
1819                              struct device_attribute *attr, char *buf)
1820 {
1821         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1822
1823         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1824 }
1825
1826 static ssize_t rbd_name_show(struct device *dev,
1827                              struct device_attribute *attr, char *buf)
1828 {
1829         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1830
1831         return sprintf(buf, "%s\n", rbd_dev->obj);
1832 }
1833
1834 static ssize_t rbd_snap_show(struct device *dev,
1835                              struct device_attribute *attr,
1836                              char *buf)
1837 {
1838         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1839
1840         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1841 }
1842
1843 static ssize_t rbd_image_refresh(struct device *dev,
1844                                  struct device_attribute *attr,
1845                                  const char *buf,
1846                                  size_t size)
1847 {
1848         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1849         int rc;
1850         int ret = size;
1851
1852         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1853
1854         rc = __rbd_update_snaps(rbd_dev);
1855         if (rc < 0)
1856                 ret = rc;
1857
1858         mutex_unlock(&ctl_mutex);
1859         return ret;
1860 }
1861
1862 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1863 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1864 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1865 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1866 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1867 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1868 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1869 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1870 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1871
1872 static struct attribute *rbd_attrs[] = {
1873         &dev_attr_size.attr,
1874         &dev_attr_major.attr,
1875         &dev_attr_client_id.attr,
1876         &dev_attr_pool.attr,
1877         &dev_attr_name.attr,
1878         &dev_attr_current_snap.attr,
1879         &dev_attr_refresh.attr,
1880         &dev_attr_create_snap.attr,
1881         &dev_attr_rollback_snap.attr,
1882         NULL
1883 };
1884
1885 static struct attribute_group rbd_attr_group = {
1886         .attrs = rbd_attrs,
1887 };
1888
1889 static const struct attribute_group *rbd_attr_groups[] = {
1890         &rbd_attr_group,
1891         NULL
1892 };
1893
1894 static void rbd_sysfs_dev_release(struct device *dev)
1895 {
1896 }
1897
1898 static struct device_type rbd_device_type = {
1899         .name           = "rbd",
1900         .groups         = rbd_attr_groups,
1901         .release        = rbd_sysfs_dev_release,
1902 };
1903
1904
1905 /*
1906   sysfs - snapshots
1907 */
1908
1909 static ssize_t rbd_snap_size_show(struct device *dev,
1910                                   struct device_attribute *attr,
1911                                   char *buf)
1912 {
1913         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1914
1915         return sprintf(buf, "%lld\n", (long long)snap->size);
1916 }
1917
1918 static ssize_t rbd_snap_id_show(struct device *dev,
1919                                 struct device_attribute *attr,
1920                                 char *buf)
1921 {
1922         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1923
1924         return sprintf(buf, "%lld\n", (long long)snap->id);
1925 }
1926
1927 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1928 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1929
1930 static struct attribute *rbd_snap_attrs[] = {
1931         &dev_attr_snap_size.attr,
1932         &dev_attr_snap_id.attr,
1933         NULL,
1934 };
1935
1936 static struct attribute_group rbd_snap_attr_group = {
1937         .attrs = rbd_snap_attrs,
1938 };
1939
1940 static void rbd_snap_dev_release(struct device *dev)
1941 {
1942         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1943         kfree(snap->name);
1944         kfree(snap);
1945 }
1946
1947 static const struct attribute_group *rbd_snap_attr_groups[] = {
1948         &rbd_snap_attr_group,
1949         NULL
1950 };
1951
1952 static struct device_type rbd_snap_device_type = {
1953         .groups         = rbd_snap_attr_groups,
1954         .release        = rbd_snap_dev_release,
1955 };
1956
1957 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1958                                   struct rbd_snap *snap)
1959 {
1960         list_del(&snap->node);
1961         device_unregister(&snap->dev);
1962 }
1963
1964 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1965                                   struct rbd_snap *snap,
1966                                   struct device *parent)
1967 {
1968         struct device *dev = &snap->dev;
1969         int ret;
1970
1971         dev->type = &rbd_snap_device_type;
1972         dev->parent = parent;
1973         dev->release = rbd_snap_dev_release;
1974         dev_set_name(dev, "snap_%s", snap->name);
1975         ret = device_register(dev);
1976
1977         return ret;
1978 }
1979
1980 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1981                               int i, const char *name,
1982                               struct rbd_snap **snapp)
1983 {
1984         int ret;
1985         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1986         if (!snap)
1987                 return -ENOMEM;
1988         snap->name = kstrdup(name, GFP_KERNEL);
1989         snap->size = rbd_dev->header.snap_sizes[i];
1990         snap->id = rbd_dev->header.snapc->snaps[i];
1991         if (device_is_registered(&rbd_dev->dev)) {
1992                 ret = rbd_register_snap_dev(rbd_dev, snap,
1993                                              &rbd_dev->dev);
1994                 if (ret < 0)
1995                         goto err;
1996         }
1997         *snapp = snap;
1998         return 0;
1999 err:
2000         kfree(snap->name);
2001         kfree(snap);
2002         return ret;
2003 }
2004
2005 /*
2006  * search for the previous snap in a null delimited string list
2007  */
2008 const char *rbd_prev_snap_name(const char *name, const char *start)
2009 {
2010         if (name < start + 2)
2011                 return NULL;
2012
2013         name -= 2;
2014         while (*name) {
2015                 if (name == start)
2016                         return start;
2017                 name--;
2018         }
2019         return name + 1;
2020 }
2021
2022 /*
2023  * compare the old list of snapshots that we have to what's in the header
2024  * and update it accordingly. Note that the header holds the snapshots
2025  * in a reverse order (from newest to oldest) and we need to go from
2026  * older to new so that we don't get a duplicate snap name when
2027  * doing the process (e.g., removed snapshot and recreated a new
2028  * one with the same name.
2029  */
2030 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2031 {
2032         const char *name, *first_name;
2033         int i = rbd_dev->header.total_snaps;
2034         struct rbd_snap *snap, *old_snap = NULL;
2035         int ret;
2036         struct list_head *p, *n;
2037
2038         first_name = rbd_dev->header.snap_names;
2039         name = first_name + rbd_dev->header.snap_names_len;
2040
2041         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2042                 u64 cur_id;
2043
2044                 old_snap = list_entry(p, struct rbd_snap, node);
2045
2046                 if (i)
2047                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2048
2049                 if (!i || old_snap->id < cur_id) {
2050                         /* old_snap->id was skipped, thus was removed */
2051                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2052                         continue;
2053                 }
2054                 if (old_snap->id == cur_id) {
2055                         /* we have this snapshot already */
2056                         i--;
2057                         name = rbd_prev_snap_name(name, first_name);
2058                         continue;
2059                 }
2060                 for (; i > 0;
2061                      i--, name = rbd_prev_snap_name(name, first_name)) {
2062                         if (!name) {
2063                                 WARN_ON(1);
2064                                 return -EINVAL;
2065                         }
2066                         cur_id = rbd_dev->header.snapc->snaps[i];
2067                         /* snapshot removal? handle it above */
2068                         if (cur_id >= old_snap->id)
2069                                 break;
2070                         /* a new snapshot */
2071                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2072                         if (ret < 0)
2073                                 return ret;
2074
2075                         /* note that we add it backward so using n and not p */
2076                         list_add(&snap->node, n);
2077                         p = &snap->node;
2078                 }
2079         }
2080         /* we're done going over the old snap list, just add what's left */
2081         for (; i > 0; i--) {
2082                 name = rbd_prev_snap_name(name, first_name);
2083                 if (!name) {
2084                         WARN_ON(1);
2085                         return -EINVAL;
2086                 }
2087                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2088                 if (ret < 0)
2089                         return ret;
2090                 list_add(&snap->node, &rbd_dev->snaps);
2091         }
2092
2093         return 0;
2094 }
2095
2096
2097 static void rbd_root_dev_release(struct device *dev)
2098 {
2099 }
2100
2101 static struct device rbd_root_dev = {
2102         .init_name =    "rbd",
2103         .release =      rbd_root_dev_release,
2104 };
2105
2106 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2107 {
2108         int ret = -ENOMEM;
2109         struct device *dev;
2110         struct rbd_snap *snap;
2111
2112         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2113         dev = &rbd_dev->dev;
2114
2115         dev->bus = &rbd_bus_type;
2116         dev->type = &rbd_device_type;
2117         dev->parent = &rbd_root_dev;
2118         dev->release = rbd_dev_release;
2119         dev_set_name(dev, "%d", rbd_dev->id);
2120         ret = device_register(dev);
2121         if (ret < 0)
2122                 goto done_free;
2123
2124         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2125                 ret = rbd_register_snap_dev(rbd_dev, snap,
2126                                              &rbd_dev->dev);
2127                 if (ret < 0)
2128                         break;
2129         }
2130
2131         mutex_unlock(&ctl_mutex);
2132         return 0;
2133 done_free:
2134         mutex_unlock(&ctl_mutex);
2135         return ret;
2136 }
2137
2138 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2139 {
2140         device_unregister(&rbd_dev->dev);
2141 }
2142
2143 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2144 {
2145         int ret, rc;
2146
2147         do {
2148                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2149                                          rbd_dev->header.obj_version);
2150                 if (ret == -ERANGE) {
2151                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2152                         rc = __rbd_update_snaps(rbd_dev);
2153                         mutex_unlock(&ctl_mutex);
2154                         if (rc < 0)
2155                                 return rc;
2156                 }
2157         } while (ret == -ERANGE);
2158
2159         return ret;
2160 }
2161
2162 static ssize_t rbd_add(struct bus_type *bus,
2163                        const char *buf,
2164                        size_t count)
2165 {
2166         struct ceph_osd_client *osdc;
2167         struct rbd_device *rbd_dev;
2168         ssize_t rc = -ENOMEM;
2169         int irc, new_id = 0;
2170         struct list_head *tmp;
2171         char *mon_dev_name;
2172         char *options;
2173
2174         if (!try_module_get(THIS_MODULE))
2175                 return -ENODEV;
2176
2177         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2178         if (!mon_dev_name)
2179                 goto err_out_mod;
2180
2181         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2182         if (!options)
2183                 goto err_mon_dev;
2184
2185         /* new rbd_device object */
2186         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2187         if (!rbd_dev)
2188                 goto err_out_opt;
2189
2190         /* static rbd_device initialization */
2191         spin_lock_init(&rbd_dev->lock);
2192         INIT_LIST_HEAD(&rbd_dev->node);
2193         INIT_LIST_HEAD(&rbd_dev->snaps);
2194
2195         /* generate unique id: find highest unique id, add one */
2196         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2197
2198         list_for_each(tmp, &rbd_dev_list) {
2199                 struct rbd_device *rbd_dev;
2200
2201                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2202                 if (rbd_dev->id >= new_id)
2203                         new_id = rbd_dev->id + 1;
2204         }
2205
2206         rbd_dev->id = new_id;
2207
2208         /* add to global list */
2209         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2210
2211         /* parse add command */
2212         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2213                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
2214                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2215                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2216                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2217                    mon_dev_name, options, rbd_dev->pool_name,
2218                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
2219                 rc = -EINVAL;
2220                 goto err_out_slot;
2221         }
2222
2223         if (rbd_dev->snap_name[0] == 0)
2224                 rbd_dev->snap_name[0] = '-';
2225
2226         rbd_dev->obj_len = strlen(rbd_dev->obj);
2227         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2228                  rbd_dev->obj, RBD_SUFFIX);
2229
2230         /* initialize rest of new object */
2231         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2232         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2233         if (rc < 0)
2234                 goto err_out_slot;
2235
2236         mutex_unlock(&ctl_mutex);
2237
2238         /* pick the pool */
2239         osdc = &rbd_dev->client->osdc;
2240         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2241         if (rc < 0)
2242                 goto err_out_client;
2243         rbd_dev->poolid = rc;
2244
2245         /* register our block device */
2246         irc = register_blkdev(0, rbd_dev->name);
2247         if (irc < 0) {
2248                 rc = irc;
2249                 goto err_out_client;
2250         }
2251         rbd_dev->major = irc;
2252
2253         rc = rbd_bus_add_dev(rbd_dev);
2254         if (rc)
2255                 goto err_out_blkdev;
2256
2257         /* set up and announce blkdev mapping */
2258         rc = rbd_init_disk(rbd_dev);
2259         if (rc)
2260                 goto err_out_bus;
2261
2262         rc = rbd_init_watch_dev(rbd_dev);
2263         if (rc)
2264                 goto err_out_bus;
2265
2266         return count;
2267
2268 err_out_bus:
2269         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2270         list_del_init(&rbd_dev->node);
2271         mutex_unlock(&ctl_mutex);
2272
2273         /* this will also clean up rest of rbd_dev stuff */
2274
2275         rbd_bus_del_dev(rbd_dev);
2276         kfree(options);
2277         kfree(mon_dev_name);
2278         return rc;
2279
2280 err_out_blkdev:
2281         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2282 err_out_client:
2283         rbd_put_client(rbd_dev);
2284         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2285 err_out_slot:
2286         list_del_init(&rbd_dev->node);
2287         mutex_unlock(&ctl_mutex);
2288
2289         kfree(rbd_dev);
2290 err_out_opt:
2291         kfree(options);
2292 err_mon_dev:
2293         kfree(mon_dev_name);
2294 err_out_mod:
2295         dout("Error adding device %s\n", buf);
2296         module_put(THIS_MODULE);
2297         return rc;
2298 }
2299
2300 static struct rbd_device *__rbd_get_dev(unsigned long id)
2301 {
2302         struct list_head *tmp;
2303         struct rbd_device *rbd_dev;
2304
2305         list_for_each(tmp, &rbd_dev_list) {
2306                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2307                 if (rbd_dev->id == id)
2308                         return rbd_dev;
2309         }
2310         return NULL;
2311 }
2312
2313 static void rbd_dev_release(struct device *dev)
2314 {
2315         struct rbd_device *rbd_dev =
2316                         container_of(dev, struct rbd_device, dev);
2317
2318         if (rbd_dev->watch_request)
2319                 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2320                                                     rbd_dev->watch_request);
2321         if (rbd_dev->watch_event)
2322                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2323
2324         rbd_put_client(rbd_dev);
2325
2326         /* clean up and free blkdev */
2327         rbd_free_disk(rbd_dev);
2328         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2329         kfree(rbd_dev);
2330
2331         /* release module ref */
2332         module_put(THIS_MODULE);
2333 }
2334
2335 static ssize_t rbd_remove(struct bus_type *bus,
2336                           const char *buf,
2337                           size_t count)
2338 {
2339         struct rbd_device *rbd_dev = NULL;
2340         int target_id, rc;
2341         unsigned long ul;
2342         int ret = count;
2343
2344         rc = strict_strtoul(buf, 10, &ul);
2345         if (rc)
2346                 return rc;
2347
2348         /* convert to int; abort if we lost anything in the conversion */
2349         target_id = (int) ul;
2350         if (target_id != ul)
2351                 return -EINVAL;
2352
2353         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2354
2355         rbd_dev = __rbd_get_dev(target_id);
2356         if (!rbd_dev) {
2357                 ret = -ENOENT;
2358                 goto done;
2359         }
2360
2361         list_del_init(&rbd_dev->node);
2362
2363         __rbd_remove_all_snaps(rbd_dev);
2364         rbd_bus_del_dev(rbd_dev);
2365
2366 done:
2367         mutex_unlock(&ctl_mutex);
2368         return ret;
2369 }
2370
2371 static ssize_t rbd_snap_add(struct device *dev,
2372                             struct device_attribute *attr,
2373                             const char *buf,
2374                             size_t count)
2375 {
2376         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2377         int ret;
2378         char *name = kmalloc(count + 1, GFP_KERNEL);
2379         if (!name)
2380                 return -ENOMEM;
2381
2382         snprintf(name, count, "%s", buf);
2383
2384         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2385
2386         ret = rbd_header_add_snap(rbd_dev,
2387                                   name, GFP_KERNEL);
2388         if (ret < 0)
2389                 goto err_unlock;
2390
2391         ret = __rbd_update_snaps(rbd_dev);
2392         if (ret < 0)
2393                 goto err_unlock;
2394
2395         /* shouldn't hold ctl_mutex when notifying.. notify might
2396            trigger a watch callback that would need to get that mutex */
2397         mutex_unlock(&ctl_mutex);
2398
2399         /* make a best effort, don't error if failed */
2400         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2401
2402         ret = count;
2403         kfree(name);
2404         return ret;
2405
2406 err_unlock:
2407         mutex_unlock(&ctl_mutex);
2408         kfree(name);
2409         return ret;
2410 }
2411
2412 static ssize_t rbd_snap_rollback(struct device *dev,
2413                                  struct device_attribute *attr,
2414                                  const char *buf,
2415                                  size_t count)
2416 {
2417         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2418         int ret;
2419         u64 snapid;
2420         u64 cur_ofs;
2421         char *seg_name = NULL;
2422         char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2423         ret = -ENOMEM;
2424         if (!snap_name)
2425                 return ret;
2426
2427         /* parse snaps add command */
2428         snprintf(snap_name, count, "%s", buf);
2429         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2430         if (!seg_name)
2431                 goto done;
2432
2433         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2434
2435         ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2436         if (ret < 0)
2437                 goto done_unlock;
2438
2439         dout("snapid=%lld\n", snapid);
2440
2441         cur_ofs = 0;
2442         while (cur_ofs < rbd_dev->header.image_size) {
2443                 cur_ofs += rbd_get_segment(&rbd_dev->header,
2444                                            rbd_dev->obj,
2445                                            cur_ofs, (u64)-1,
2446                                            seg_name, NULL);
2447                 dout("seg_name=%s\n", seg_name);
2448
2449                 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2450                 if (ret < 0)
2451                         pr_warning("could not roll back obj %s err=%d\n",
2452                                    seg_name, ret);
2453         }
2454
2455         ret = __rbd_update_snaps(rbd_dev);
2456         if (ret < 0)
2457                 goto done_unlock;
2458
2459         ret = count;
2460
2461 done_unlock:
2462         mutex_unlock(&ctl_mutex);
2463 done:
2464         kfree(seg_name);
2465         kfree(snap_name);
2466
2467         return ret;
2468 }
2469
2470 static struct bus_attribute rbd_bus_attrs[] = {
2471         __ATTR(add, S_IWUSR, NULL, rbd_add),
2472         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2473         __ATTR_NULL
2474 };
2475
2476 /*
2477  * create control files in sysfs
2478  * /sys/bus/rbd/...
2479  */
2480 static int rbd_sysfs_init(void)
2481 {
2482         int ret;
2483
2484         rbd_bus_type.bus_attrs = rbd_bus_attrs;
2485
2486         ret = bus_register(&rbd_bus_type);
2487          if (ret < 0)
2488                 return ret;
2489
2490         ret = device_register(&rbd_root_dev);
2491
2492         return ret;
2493 }
2494
2495 static void rbd_sysfs_cleanup(void)
2496 {
2497         device_unregister(&rbd_root_dev);
2498         bus_unregister(&rbd_bus_type);
2499 }
2500
2501 int __init rbd_init(void)
2502 {
2503         int rc;
2504
2505         rc = rbd_sysfs_init();
2506         if (rc)
2507                 return rc;
2508         spin_lock_init(&node_lock);
2509         pr_info("loaded " DRV_NAME_LONG "\n");
2510         return 0;
2511 }
2512
2513 void __exit rbd_exit(void)
2514 {
2515         rbd_sysfs_cleanup();
2516 }
2517
2518 module_init(rbd_init);
2519 module_exit(rbd_exit);
2520
2521 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2522 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2523 MODULE_DESCRIPTION("rados block device");
2524
2525 /* following authorship retained from original osdblk.c */
2526 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2527
2528 MODULE_LICENSE("GPL");