]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: dynamically allocate image name
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u64 snap_seq;
86         u32 total_snaps;
87
88         char *snap_names;
89         u64 *snap_sizes;
90
91         u64 obj_version;
92 };
93
94 struct rbd_options {
95         int     notify_timeout;
96 };
97
98 /*
99  * an instance of the client.  multiple devices may share an rbd client.
100  */
101 struct rbd_client {
102         struct ceph_client      *client;
103         struct rbd_options      *rbd_opts;
104         struct kref             kref;
105         struct list_head        node;
106 };
107
108 /*
109  * a request completion status
110  */
111 struct rbd_req_status {
112         int done;
113         int rc;
114         u64 bytes;
115 };
116
117 /*
118  * a collection of requests
119  */
120 struct rbd_req_coll {
121         int                     total;
122         int                     num_done;
123         struct kref             kref;
124         struct rbd_req_status   status[0];
125 };
126
127 /*
128  * a single io request
129  */
130 struct rbd_request {
131         struct request          *rq;            /* blk layer request */
132         struct bio              *bio;           /* cloned bio */
133         struct page             **pages;        /* list of used pages */
134         u64                     len;
135         int                     coll_index;
136         struct rbd_req_coll     *coll;
137 };
138
139 struct rbd_snap {
140         struct  device          dev;
141         const char              *name;
142         u64                     size;
143         struct list_head        node;
144         u64                     id;
145 };
146
147 /*
148  * a single device
149  */
150 struct rbd_device {
151         int                     id;             /* blkdev unique id */
152
153         int                     major;          /* blkdev assigned major */
154         struct gendisk          *disk;          /* blkdev's gendisk and rq */
155         struct request_queue    *q;
156
157         struct rbd_client       *rbd_client;
158
159         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161         spinlock_t              lock;           /* queue lock */
162
163         struct rbd_image_header header;
164         char                    *obj; /* rbd image name */
165         size_t                  obj_len;
166         char                    *obj_md_name; /* hdr nm. */
167         char                    *pool_name;
168         int                     pool_id;
169
170         struct ceph_osd_event   *watch_event;
171         struct ceph_osd_request *watch_request;
172
173         /* protects updating the header */
174         struct rw_semaphore     header_rwsem;
175         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
176         u64                     snap_id;        /* current snapshot id */
177         int read_only;
178
179         struct list_head        node;
180
181         /* list of snapshots */
182         struct list_head        snaps;
183
184         /* sysfs related */
185         struct device           dev;
186 };
187
188 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
189
190 static LIST_HEAD(rbd_dev_list);    /* devices */
191 static DEFINE_SPINLOCK(rbd_dev_list_lock);
192
193 static LIST_HEAD(rbd_client_list);              /* clients */
194 static DEFINE_SPINLOCK(rbd_client_list_lock);
195
196 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
197 static void rbd_dev_release(struct device *dev);
198 static ssize_t rbd_snap_add(struct device *dev,
199                             struct device_attribute *attr,
200                             const char *buf,
201                             size_t count);
202 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
203                                   struct rbd_snap *snap);
204
205 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
206                        size_t count);
207 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
208                           size_t count);
209
210 static struct bus_attribute rbd_bus_attrs[] = {
211         __ATTR(add, S_IWUSR, NULL, rbd_add),
212         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
213         __ATTR_NULL
214 };
215
216 static struct bus_type rbd_bus_type = {
217         .name           = "rbd",
218         .bus_attrs      = rbd_bus_attrs,
219 };
220
221 static void rbd_root_dev_release(struct device *dev)
222 {
223 }
224
225 static struct device rbd_root_dev = {
226         .init_name =    "rbd",
227         .release =      rbd_root_dev_release,
228 };
229
230
231 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
232 {
233         return get_device(&rbd_dev->dev);
234 }
235
236 static void rbd_put_dev(struct rbd_device *rbd_dev)
237 {
238         put_device(&rbd_dev->dev);
239 }
240
241 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
242
243 static int rbd_open(struct block_device *bdev, fmode_t mode)
244 {
245         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
246
247         rbd_get_dev(rbd_dev);
248
249         set_device_ro(bdev, rbd_dev->read_only);
250
251         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
252                 return -EROFS;
253
254         return 0;
255 }
256
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
258 {
259         struct rbd_device *rbd_dev = disk->private_data;
260
261         rbd_put_dev(rbd_dev);
262
263         return 0;
264 }
265
266 static const struct block_device_operations rbd_bd_ops = {
267         .owner                  = THIS_MODULE,
268         .open                   = rbd_open,
269         .release                = rbd_release,
270 };
271
272 /*
273  * Initialize an rbd client instance.
274  * We own *opt.
275  */
276 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
277                                             struct rbd_options *rbd_opts)
278 {
279         struct rbd_client *rbdc;
280         int ret = -ENOMEM;
281
282         dout("rbd_client_create\n");
283         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
284         if (!rbdc)
285                 goto out_opt;
286
287         kref_init(&rbdc->kref);
288         INIT_LIST_HEAD(&rbdc->node);
289
290         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
291
292         rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
293         if (IS_ERR(rbdc->client))
294                 goto out_mutex;
295         opt = NULL; /* Now rbdc->client is responsible for opt */
296
297         ret = ceph_open_session(rbdc->client);
298         if (ret < 0)
299                 goto out_err;
300
301         rbdc->rbd_opts = rbd_opts;
302
303         spin_lock(&rbd_client_list_lock);
304         list_add_tail(&rbdc->node, &rbd_client_list);
305         spin_unlock(&rbd_client_list_lock);
306
307         mutex_unlock(&ctl_mutex);
308
309         dout("rbd_client_create created %p\n", rbdc);
310         return rbdc;
311
312 out_err:
313         ceph_destroy_client(rbdc->client);
314 out_mutex:
315         mutex_unlock(&ctl_mutex);
316         kfree(rbdc);
317 out_opt:
318         if (opt)
319                 ceph_destroy_options(opt);
320         return ERR_PTR(ret);
321 }
322
323 /*
324  * Find a ceph client with specific addr and configuration.
325  */
326 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
327 {
328         struct rbd_client *client_node;
329
330         if (opt->flags & CEPH_OPT_NOSHARE)
331                 return NULL;
332
333         list_for_each_entry(client_node, &rbd_client_list, node)
334                 if (ceph_compare_options(opt, client_node->client) == 0)
335                         return client_node;
336         return NULL;
337 }
338
339 /*
340  * mount options
341  */
342 enum {
343         Opt_notify_timeout,
344         Opt_last_int,
345         /* int args above */
346         Opt_last_string,
347         /* string args above */
348 };
349
350 static match_table_t rbdopt_tokens = {
351         {Opt_notify_timeout, "notify_timeout=%d"},
352         /* int args above */
353         /* string args above */
354         {-1, NULL}
355 };
356
357 static int parse_rbd_opts_token(char *c, void *private)
358 {
359         struct rbd_options *rbdopt = private;
360         substring_t argstr[MAX_OPT_ARGS];
361         int token, intval, ret;
362
363         token = match_token(c, rbdopt_tokens, argstr);
364         if (token < 0)
365                 return -EINVAL;
366
367         if (token < Opt_last_int) {
368                 ret = match_int(&argstr[0], &intval);
369                 if (ret < 0) {
370                         pr_err("bad mount option arg (not int) "
371                                "at '%s'\n", c);
372                         return ret;
373                 }
374                 dout("got int token %d val %d\n", token, intval);
375         } else if (token > Opt_last_int && token < Opt_last_string) {
376                 dout("got string token %d val %s\n", token,
377                      argstr[0].from);
378         } else {
379                 dout("got token %d\n", token);
380         }
381
382         switch (token) {
383         case Opt_notify_timeout:
384                 rbdopt->notify_timeout = intval;
385                 break;
386         default:
387                 BUG_ON(token);
388         }
389         return 0;
390 }
391
392 /*
393  * Get a ceph client with specific addr and configuration, if one does
394  * not exist create it.
395  */
396 static struct rbd_client *rbd_get_client(const char *mon_addr,
397                                          size_t mon_addr_len,
398                                          char *options)
399 {
400         struct rbd_client *rbdc;
401         struct ceph_options *opt;
402         struct rbd_options *rbd_opts;
403
404         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
405         if (!rbd_opts)
406                 return ERR_PTR(-ENOMEM);
407
408         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
409
410         opt = ceph_parse_options(options, mon_addr,
411                                 mon_addr + mon_addr_len,
412                                 parse_rbd_opts_token, rbd_opts);
413         if (IS_ERR(opt)) {
414                 kfree(rbd_opts);
415                 return ERR_CAST(opt);
416         }
417
418         spin_lock(&rbd_client_list_lock);
419         rbdc = __rbd_client_find(opt);
420         if (rbdc) {
421                 /* using an existing client */
422                 kref_get(&rbdc->kref);
423                 spin_unlock(&rbd_client_list_lock);
424
425                 ceph_destroy_options(opt);
426                 kfree(rbd_opts);
427
428                 return rbdc;
429         }
430         spin_unlock(&rbd_client_list_lock);
431
432         rbdc = rbd_client_create(opt, rbd_opts);
433
434         if (IS_ERR(rbdc))
435                 kfree(rbd_opts);
436
437         return rbdc;
438 }
439
440 /*
441  * Destroy ceph client
442  *
443  * Caller must hold rbd_client_list_lock.
444  */
445 static void rbd_client_release(struct kref *kref)
446 {
447         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
448
449         dout("rbd_release_client %p\n", rbdc);
450         spin_lock(&rbd_client_list_lock);
451         list_del(&rbdc->node);
452         spin_unlock(&rbd_client_list_lock);
453
454         ceph_destroy_client(rbdc->client);
455         kfree(rbdc->rbd_opts);
456         kfree(rbdc);
457 }
458
459 /*
460  * Drop reference to ceph client node. If it's not referenced anymore, release
461  * it.
462  */
463 static void rbd_put_client(struct rbd_device *rbd_dev)
464 {
465         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
466         rbd_dev->rbd_client = NULL;
467 }
468
469 /*
470  * Destroy requests collection
471  */
472 static void rbd_coll_release(struct kref *kref)
473 {
474         struct rbd_req_coll *coll =
475                 container_of(kref, struct rbd_req_coll, kref);
476
477         dout("rbd_coll_release %p\n", coll);
478         kfree(coll);
479 }
480
481 /*
482  * Create a new header structure, translate header format from the on-disk
483  * header.
484  */
485 static int rbd_header_from_disk(struct rbd_image_header *header,
486                                  struct rbd_image_header_ondisk *ondisk,
487                                  u32 allocated_snaps,
488                                  gfp_t gfp_flags)
489 {
490         u32 i, snap_count;
491
492         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
493                 return -ENXIO;
494
495         snap_count = le32_to_cpu(ondisk->snap_count);
496         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
497                          / sizeof (*ondisk))
498                 return -EINVAL;
499         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
500                                 snap_count * sizeof(u64),
501                                 gfp_flags);
502         if (!header->snapc)
503                 return -ENOMEM;
504
505         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
506         if (snap_count) {
507                 header->snap_names = kmalloc(header->snap_names_len,
508                                              gfp_flags);
509                 if (!header->snap_names)
510                         goto err_snapc;
511                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
512                                              gfp_flags);
513                 if (!header->snap_sizes)
514                         goto err_names;
515         } else {
516                 header->snap_names = NULL;
517                 header->snap_sizes = NULL;
518         }
519
520         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
521                                         gfp_flags);
522         if (!header->object_prefix)
523                 goto err_sizes;
524
525         memcpy(header->object_prefix, ondisk->block_name,
526                sizeof(ondisk->block_name));
527         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
528
529         header->image_size = le64_to_cpu(ondisk->image_size);
530         header->obj_order = ondisk->options.order;
531         header->crypt_type = ondisk->options.crypt_type;
532         header->comp_type = ondisk->options.comp_type;
533
534         atomic_set(&header->snapc->nref, 1);
535         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
536         header->snapc->num_snaps = snap_count;
537         header->total_snaps = snap_count;
538
539         if (snap_count && allocated_snaps == snap_count) {
540                 for (i = 0; i < snap_count; i++) {
541                         header->snapc->snaps[i] =
542                                 le64_to_cpu(ondisk->snaps[i].id);
543                         header->snap_sizes[i] =
544                                 le64_to_cpu(ondisk->snaps[i].image_size);
545                 }
546
547                 /* copy snapshot names */
548                 memcpy(header->snap_names, &ondisk->snaps[i],
549                         header->snap_names_len);
550         }
551
552         return 0;
553
554 err_sizes:
555         kfree(header->snap_sizes);
556 err_names:
557         kfree(header->snap_names);
558 err_snapc:
559         kfree(header->snapc);
560         return -ENOMEM;
561 }
562
563 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
564                         u64 *seq, u64 *size)
565 {
566         int i;
567         char *p = header->snap_names;
568
569         for (i = 0; i < header->total_snaps; i++) {
570                 if (!strcmp(snap_name, p)) {
571
572                         /* Found it.  Pass back its id and/or size */
573
574                         if (seq)
575                                 *seq = header->snapc->snaps[i];
576                         if (size)
577                                 *size = header->snap_sizes[i];
578                         return i;
579                 }
580                 p += strlen(p) + 1;     /* Skip ahead to the next name */
581         }
582         return -ENOENT;
583 }
584
585 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
586 {
587         struct rbd_image_header *header = &dev->header;
588         struct ceph_snap_context *snapc = header->snapc;
589         int ret = -ENOENT;
590
591         BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
592
593         down_write(&dev->header_rwsem);
594
595         if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
596                     sizeof (RBD_SNAP_HEAD_NAME))) {
597                 if (header->total_snaps)
598                         snapc->seq = header->snap_seq;
599                 else
600                         snapc->seq = 0;
601                 dev->snap_id = CEPH_NOSNAP;
602                 dev->read_only = 0;
603                 if (size)
604                         *size = header->image_size;
605         } else {
606                 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
607                 if (ret < 0)
608                         goto done;
609                 dev->snap_id = snapc->seq;
610                 dev->read_only = 1;
611         }
612
613         ret = 0;
614 done:
615         up_write(&dev->header_rwsem);
616         return ret;
617 }
618
619 static void rbd_header_free(struct rbd_image_header *header)
620 {
621         kfree(header->object_prefix);
622         kfree(header->snap_sizes);
623         kfree(header->snap_names);
624         kfree(header->snapc);
625 }
626
627 /*
628  * get the actual striped segment name, offset and length
629  */
630 static u64 rbd_get_segment(struct rbd_image_header *header,
631                            const char *object_prefix,
632                            u64 ofs, u64 len,
633                            char *seg_name, u64 *segofs)
634 {
635         u64 seg = ofs >> header->obj_order;
636
637         if (seg_name)
638                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
639                          "%s.%012llx", object_prefix, seg);
640
641         ofs = ofs & ((1 << header->obj_order) - 1);
642         len = min_t(u64, len, (1 << header->obj_order) - ofs);
643
644         if (segofs)
645                 *segofs = ofs;
646
647         return len;
648 }
649
650 static int rbd_get_num_segments(struct rbd_image_header *header,
651                                 u64 ofs, u64 len)
652 {
653         u64 start_seg = ofs >> header->obj_order;
654         u64 end_seg = (ofs + len - 1) >> header->obj_order;
655         return end_seg - start_seg + 1;
656 }
657
658 /*
659  * returns the size of an object in the image
660  */
661 static u64 rbd_obj_bytes(struct rbd_image_header *header)
662 {
663         return 1 << header->obj_order;
664 }
665
666 /*
667  * bio helpers
668  */
669
670 static void bio_chain_put(struct bio *chain)
671 {
672         struct bio *tmp;
673
674         while (chain) {
675                 tmp = chain;
676                 chain = chain->bi_next;
677                 bio_put(tmp);
678         }
679 }
680
681 /*
682  * zeros a bio chain, starting at specific offset
683  */
684 static void zero_bio_chain(struct bio *chain, int start_ofs)
685 {
686         struct bio_vec *bv;
687         unsigned long flags;
688         void *buf;
689         int i;
690         int pos = 0;
691
692         while (chain) {
693                 bio_for_each_segment(bv, chain, i) {
694                         if (pos + bv->bv_len > start_ofs) {
695                                 int remainder = max(start_ofs - pos, 0);
696                                 buf = bvec_kmap_irq(bv, &flags);
697                                 memset(buf + remainder, 0,
698                                        bv->bv_len - remainder);
699                                 bvec_kunmap_irq(buf, &flags);
700                         }
701                         pos += bv->bv_len;
702                 }
703
704                 chain = chain->bi_next;
705         }
706 }
707
708 /*
709  * bio_chain_clone - clone a chain of bios up to a certain length.
710  * might return a bio_pair that will need to be released.
711  */
712 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
713                                    struct bio_pair **bp,
714                                    int len, gfp_t gfpmask)
715 {
716         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
717         int total = 0;
718
719         if (*bp) {
720                 bio_pair_release(*bp);
721                 *bp = NULL;
722         }
723
724         while (old_chain && (total < len)) {
725                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
726                 if (!tmp)
727                         goto err_out;
728
729                 if (total + old_chain->bi_size > len) {
730                         struct bio_pair *bp;
731
732                         /*
733                          * this split can only happen with a single paged bio,
734                          * split_bio will BUG_ON if this is not the case
735                          */
736                         dout("bio_chain_clone split! total=%d remaining=%d"
737                              "bi_size=%d\n",
738                              (int)total, (int)len-total,
739                              (int)old_chain->bi_size);
740
741                         /* split the bio. We'll release it either in the next
742                            call, or it will have to be released outside */
743                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
744                         if (!bp)
745                                 goto err_out;
746
747                         __bio_clone(tmp, &bp->bio1);
748
749                         *next = &bp->bio2;
750                 } else {
751                         __bio_clone(tmp, old_chain);
752                         *next = old_chain->bi_next;
753                 }
754
755                 tmp->bi_bdev = NULL;
756                 gfpmask &= ~__GFP_WAIT;
757                 tmp->bi_next = NULL;
758
759                 if (!new_chain) {
760                         new_chain = tail = tmp;
761                 } else {
762                         tail->bi_next = tmp;
763                         tail = tmp;
764                 }
765                 old_chain = old_chain->bi_next;
766
767                 total += tmp->bi_size;
768         }
769
770         BUG_ON(total < len);
771
772         if (tail)
773                 tail->bi_next = NULL;
774
775         *old = old_chain;
776
777         return new_chain;
778
779 err_out:
780         dout("bio_chain_clone with err\n");
781         bio_chain_put(new_chain);
782         return NULL;
783 }
784
785 /*
786  * helpers for osd request op vectors.
787  */
788 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
789                             int num_ops,
790                             int opcode,
791                             u32 payload_len)
792 {
793         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
794                        GFP_NOIO);
795         if (!*ops)
796                 return -ENOMEM;
797         (*ops)[0].op = opcode;
798         /*
799          * op extent offset and length will be set later on
800          * in calc_raw_layout()
801          */
802         (*ops)[0].payload_len = payload_len;
803         return 0;
804 }
805
806 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
807 {
808         kfree(ops);
809 }
810
811 static void rbd_coll_end_req_index(struct request *rq,
812                                    struct rbd_req_coll *coll,
813                                    int index,
814                                    int ret, u64 len)
815 {
816         struct request_queue *q;
817         int min, max, i;
818
819         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
820              coll, index, ret, len);
821
822         if (!rq)
823                 return;
824
825         if (!coll) {
826                 blk_end_request(rq, ret, len);
827                 return;
828         }
829
830         q = rq->q;
831
832         spin_lock_irq(q->queue_lock);
833         coll->status[index].done = 1;
834         coll->status[index].rc = ret;
835         coll->status[index].bytes = len;
836         max = min = coll->num_done;
837         while (max < coll->total && coll->status[max].done)
838                 max++;
839
840         for (i = min; i<max; i++) {
841                 __blk_end_request(rq, coll->status[i].rc,
842                                   coll->status[i].bytes);
843                 coll->num_done++;
844                 kref_put(&coll->kref, rbd_coll_release);
845         }
846         spin_unlock_irq(q->queue_lock);
847 }
848
849 static void rbd_coll_end_req(struct rbd_request *req,
850                              int ret, u64 len)
851 {
852         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
853 }
854
855 /*
856  * Send ceph osd request
857  */
858 static int rbd_do_request(struct request *rq,
859                           struct rbd_device *dev,
860                           struct ceph_snap_context *snapc,
861                           u64 snapid,
862                           const char *obj, u64 ofs, u64 len,
863                           struct bio *bio,
864                           struct page **pages,
865                           int num_pages,
866                           int flags,
867                           struct ceph_osd_req_op *ops,
868                           int num_reply,
869                           struct rbd_req_coll *coll,
870                           int coll_index,
871                           void (*rbd_cb)(struct ceph_osd_request *req,
872                                          struct ceph_msg *msg),
873                           struct ceph_osd_request **linger_req,
874                           u64 *ver)
875 {
876         struct ceph_osd_request *req;
877         struct ceph_file_layout *layout;
878         int ret;
879         u64 bno;
880         struct timespec mtime = CURRENT_TIME;
881         struct rbd_request *req_data;
882         struct ceph_osd_request_head *reqhead;
883         struct ceph_osd_client *osdc;
884
885         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
886         if (!req_data) {
887                 if (coll)
888                         rbd_coll_end_req_index(rq, coll, coll_index,
889                                                -ENOMEM, len);
890                 return -ENOMEM;
891         }
892
893         if (coll) {
894                 req_data->coll = coll;
895                 req_data->coll_index = coll_index;
896         }
897
898         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
899
900         down_read(&dev->header_rwsem);
901
902         osdc = &dev->rbd_client->client->osdc;
903         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
904                                         false, GFP_NOIO, pages, bio);
905         if (!req) {
906                 up_read(&dev->header_rwsem);
907                 ret = -ENOMEM;
908                 goto done_pages;
909         }
910
911         req->r_callback = rbd_cb;
912
913         req_data->rq = rq;
914         req_data->bio = bio;
915         req_data->pages = pages;
916         req_data->len = len;
917
918         req->r_priv = req_data;
919
920         reqhead = req->r_request->front.iov_base;
921         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
922
923         strncpy(req->r_oid, obj, sizeof(req->r_oid));
924         req->r_oid_len = strlen(req->r_oid);
925
926         layout = &req->r_file_layout;
927         memset(layout, 0, sizeof(*layout));
928         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929         layout->fl_stripe_count = cpu_to_le32(1);
930         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931         layout->fl_pg_pool = cpu_to_le32(dev->pool_id);
932         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
933                                 req, ops);
934
935         ceph_osdc_build_request(req, ofs, &len,
936                                 ops,
937                                 snapc,
938                                 &mtime,
939                                 req->r_oid, req->r_oid_len);
940         up_read(&dev->header_rwsem);
941
942         if (linger_req) {
943                 ceph_osdc_set_request_linger(osdc, req);
944                 *linger_req = req;
945         }
946
947         ret = ceph_osdc_start_request(osdc, req, false);
948         if (ret < 0)
949                 goto done_err;
950
951         if (!rbd_cb) {
952                 ret = ceph_osdc_wait_request(osdc, req);
953                 if (ver)
954                         *ver = le64_to_cpu(req->r_reassert_version.version);
955                 dout("reassert_ver=%lld\n",
956                      le64_to_cpu(req->r_reassert_version.version));
957                 ceph_osdc_put_request(req);
958         }
959         return ret;
960
961 done_err:
962         bio_chain_put(req_data->bio);
963         ceph_osdc_put_request(req);
964 done_pages:
965         rbd_coll_end_req(req_data, ret, len);
966         kfree(req_data);
967         return ret;
968 }
969
970 /*
971  * Ceph osd op callback
972  */
973 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
974 {
975         struct rbd_request *req_data = req->r_priv;
976         struct ceph_osd_reply_head *replyhead;
977         struct ceph_osd_op *op;
978         __s32 rc;
979         u64 bytes;
980         int read_op;
981
982         /* parse reply */
983         replyhead = msg->front.iov_base;
984         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
985         op = (void *)(replyhead + 1);
986         rc = le32_to_cpu(replyhead->result);
987         bytes = le64_to_cpu(op->extent.length);
988         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
989
990         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
991
992         if (rc == -ENOENT && read_op) {
993                 zero_bio_chain(req_data->bio, 0);
994                 rc = 0;
995         } else if (rc == 0 && read_op && bytes < req_data->len) {
996                 zero_bio_chain(req_data->bio, bytes);
997                 bytes = req_data->len;
998         }
999
1000         rbd_coll_end_req(req_data, rc, bytes);
1001
1002         if (req_data->bio)
1003                 bio_chain_put(req_data->bio);
1004
1005         ceph_osdc_put_request(req);
1006         kfree(req_data);
1007 }
1008
1009 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1010 {
1011         ceph_osdc_put_request(req);
1012 }
1013
1014 /*
1015  * Do a synchronous ceph osd operation
1016  */
1017 static int rbd_req_sync_op(struct rbd_device *dev,
1018                            struct ceph_snap_context *snapc,
1019                            u64 snapid,
1020                            int opcode,
1021                            int flags,
1022                            struct ceph_osd_req_op *orig_ops,
1023                            int num_reply,
1024                            const char *obj,
1025                            u64 ofs, u64 len,
1026                            char *buf,
1027                            struct ceph_osd_request **linger_req,
1028                            u64 *ver)
1029 {
1030         int ret;
1031         struct page **pages;
1032         int num_pages;
1033         struct ceph_osd_req_op *ops = orig_ops;
1034         u32 payload_len;
1035
1036         num_pages = calc_pages_for(ofs , len);
1037         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1038         if (IS_ERR(pages))
1039                 return PTR_ERR(pages);
1040
1041         if (!orig_ops) {
1042                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1043                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1044                 if (ret < 0)
1045                         goto done;
1046
1047                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1048                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1049                         if (ret < 0)
1050                                 goto done_ops;
1051                 }
1052         }
1053
1054         ret = rbd_do_request(NULL, dev, snapc, snapid,
1055                           obj, ofs, len, NULL,
1056                           pages, num_pages,
1057                           flags,
1058                           ops,
1059                           2,
1060                           NULL, 0,
1061                           NULL,
1062                           linger_req, ver);
1063         if (ret < 0)
1064                 goto done_ops;
1065
1066         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1067                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1068
1069 done_ops:
1070         if (!orig_ops)
1071                 rbd_destroy_ops(ops);
1072 done:
1073         ceph_release_page_vector(pages, num_pages);
1074         return ret;
1075 }
1076
1077 /*
1078  * Do an asynchronous ceph osd operation
1079  */
1080 static int rbd_do_op(struct request *rq,
1081                      struct rbd_device *rbd_dev ,
1082                      struct ceph_snap_context *snapc,
1083                      u64 snapid,
1084                      int opcode, int flags, int num_reply,
1085                      u64 ofs, u64 len,
1086                      struct bio *bio,
1087                      struct rbd_req_coll *coll,
1088                      int coll_index)
1089 {
1090         char *seg_name;
1091         u64 seg_ofs;
1092         u64 seg_len;
1093         int ret;
1094         struct ceph_osd_req_op *ops;
1095         u32 payload_len;
1096
1097         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1098         if (!seg_name)
1099                 return -ENOMEM;
1100
1101         seg_len = rbd_get_segment(&rbd_dev->header,
1102                                   rbd_dev->header.object_prefix,
1103                                   ofs, len,
1104                                   seg_name, &seg_ofs);
1105
1106         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1107
1108         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1109         if (ret < 0)
1110                 goto done;
1111
1112         /* we've taken care of segment sizes earlier when we
1113            cloned the bios. We should never have a segment
1114            truncated at this point */
1115         BUG_ON(seg_len < len);
1116
1117         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1118                              seg_name, seg_ofs, seg_len,
1119                              bio,
1120                              NULL, 0,
1121                              flags,
1122                              ops,
1123                              num_reply,
1124                              coll, coll_index,
1125                              rbd_req_cb, 0, NULL);
1126
1127         rbd_destroy_ops(ops);
1128 done:
1129         kfree(seg_name);
1130         return ret;
1131 }
1132
1133 /*
1134  * Request async osd write
1135  */
1136 static int rbd_req_write(struct request *rq,
1137                          struct rbd_device *rbd_dev,
1138                          struct ceph_snap_context *snapc,
1139                          u64 ofs, u64 len,
1140                          struct bio *bio,
1141                          struct rbd_req_coll *coll,
1142                          int coll_index)
1143 {
1144         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1145                          CEPH_OSD_OP_WRITE,
1146                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1147                          2,
1148                          ofs, len, bio, coll, coll_index);
1149 }
1150
1151 /*
1152  * Request async osd read
1153  */
1154 static int rbd_req_read(struct request *rq,
1155                          struct rbd_device *rbd_dev,
1156                          u64 snapid,
1157                          u64 ofs, u64 len,
1158                          struct bio *bio,
1159                          struct rbd_req_coll *coll,
1160                          int coll_index)
1161 {
1162         return rbd_do_op(rq, rbd_dev, NULL,
1163                          snapid,
1164                          CEPH_OSD_OP_READ,
1165                          CEPH_OSD_FLAG_READ,
1166                          2,
1167                          ofs, len, bio, coll, coll_index);
1168 }
1169
1170 /*
1171  * Request sync osd read
1172  */
1173 static int rbd_req_sync_read(struct rbd_device *dev,
1174                           struct ceph_snap_context *snapc,
1175                           u64 snapid,
1176                           const char *obj,
1177                           u64 ofs, u64 len,
1178                           char *buf,
1179                           u64 *ver)
1180 {
1181         return rbd_req_sync_op(dev, NULL,
1182                                snapid,
1183                                CEPH_OSD_OP_READ,
1184                                CEPH_OSD_FLAG_READ,
1185                                NULL,
1186                                1, obj, ofs, len, buf, NULL, ver);
1187 }
1188
1189 /*
1190  * Request sync osd watch
1191  */
1192 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1193                                    u64 ver,
1194                                    u64 notify_id,
1195                                    const char *obj)
1196 {
1197         struct ceph_osd_req_op *ops;
1198         int ret;
1199
1200         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1201         if (ret < 0)
1202                 return ret;
1203
1204         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1205         ops[0].watch.cookie = notify_id;
1206         ops[0].watch.flag = 0;
1207
1208         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1209                           obj, 0, 0, NULL,
1210                           NULL, 0,
1211                           CEPH_OSD_FLAG_READ,
1212                           ops,
1213                           1,
1214                           NULL, 0,
1215                           rbd_simple_req_cb, 0, NULL);
1216
1217         rbd_destroy_ops(ops);
1218         return ret;
1219 }
1220
1221 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1222 {
1223         struct rbd_device *dev = (struct rbd_device *)data;
1224         int rc;
1225
1226         if (!dev)
1227                 return;
1228
1229         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1230                 notify_id, (int)opcode);
1231         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1232         rc = __rbd_refresh_header(dev);
1233         mutex_unlock(&ctl_mutex);
1234         if (rc)
1235                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1236                            " update snaps: %d\n", dev->major, rc);
1237
1238         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1239 }
1240
1241 /*
1242  * Request sync osd watch
1243  */
1244 static int rbd_req_sync_watch(struct rbd_device *dev,
1245                               const char *obj,
1246                               u64 ver)
1247 {
1248         struct ceph_osd_req_op *ops;
1249         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1250
1251         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1252         if (ret < 0)
1253                 return ret;
1254
1255         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1256                                      (void *)dev, &dev->watch_event);
1257         if (ret < 0)
1258                 goto fail;
1259
1260         ops[0].watch.ver = cpu_to_le64(ver);
1261         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1262         ops[0].watch.flag = 1;
1263
1264         ret = rbd_req_sync_op(dev, NULL,
1265                               CEPH_NOSNAP,
1266                               0,
1267                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1268                               ops,
1269                               1, obj, 0, 0, NULL,
1270                               &dev->watch_request, NULL);
1271
1272         if (ret < 0)
1273                 goto fail_event;
1274
1275         rbd_destroy_ops(ops);
1276         return 0;
1277
1278 fail_event:
1279         ceph_osdc_cancel_event(dev->watch_event);
1280         dev->watch_event = NULL;
1281 fail:
1282         rbd_destroy_ops(ops);
1283         return ret;
1284 }
1285
1286 /*
1287  * Request sync osd unwatch
1288  */
1289 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1290                                 const char *obj)
1291 {
1292         struct ceph_osd_req_op *ops;
1293
1294         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1295         if (ret < 0)
1296                 return ret;
1297
1298         ops[0].watch.ver = 0;
1299         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1300         ops[0].watch.flag = 0;
1301
1302         ret = rbd_req_sync_op(dev, NULL,
1303                               CEPH_NOSNAP,
1304                               0,
1305                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1306                               ops,
1307                               1, obj, 0, 0, NULL, NULL, NULL);
1308
1309         rbd_destroy_ops(ops);
1310         ceph_osdc_cancel_event(dev->watch_event);
1311         dev->watch_event = NULL;
1312         return ret;
1313 }
1314
1315 struct rbd_notify_info {
1316         struct rbd_device *dev;
1317 };
1318
1319 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1320 {
1321         struct rbd_device *dev = (struct rbd_device *)data;
1322         if (!dev)
1323                 return;
1324
1325         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1326                 notify_id, (int)opcode);
1327 }
1328
1329 /*
1330  * Request sync osd notify
1331  */
1332 static int rbd_req_sync_notify(struct rbd_device *dev,
1333                           const char *obj)
1334 {
1335         struct ceph_osd_req_op *ops;
1336         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1337         struct ceph_osd_event *event;
1338         struct rbd_notify_info info;
1339         int payload_len = sizeof(u32) + sizeof(u32);
1340         int ret;
1341
1342         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1343         if (ret < 0)
1344                 return ret;
1345
1346         info.dev = dev;
1347
1348         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1349                                      (void *)&info, &event);
1350         if (ret < 0)
1351                 goto fail;
1352
1353         ops[0].watch.ver = 1;
1354         ops[0].watch.flag = 1;
1355         ops[0].watch.cookie = event->cookie;
1356         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1357         ops[0].watch.timeout = 12;
1358
1359         ret = rbd_req_sync_op(dev, NULL,
1360                                CEPH_NOSNAP,
1361                                0,
1362                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1363                                ops,
1364                                1, obj, 0, 0, NULL, NULL, NULL);
1365         if (ret < 0)
1366                 goto fail_event;
1367
1368         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1369         dout("ceph_osdc_wait_event returned %d\n", ret);
1370         rbd_destroy_ops(ops);
1371         return 0;
1372
1373 fail_event:
1374         ceph_osdc_cancel_event(event);
1375 fail:
1376         rbd_destroy_ops(ops);
1377         return ret;
1378 }
1379
1380 /*
1381  * Request sync osd read
1382  */
1383 static int rbd_req_sync_exec(struct rbd_device *dev,
1384                              const char *obj,
1385                              const char *cls,
1386                              const char *method,
1387                              const char *data,
1388                              int len,
1389                              u64 *ver)
1390 {
1391         struct ceph_osd_req_op *ops;
1392         int cls_len = strlen(cls);
1393         int method_len = strlen(method);
1394         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1395                                     cls_len + method_len + len);
1396         if (ret < 0)
1397                 return ret;
1398
1399         ops[0].cls.class_name = cls;
1400         ops[0].cls.class_len = (__u8)cls_len;
1401         ops[0].cls.method_name = method;
1402         ops[0].cls.method_len = (__u8)method_len;
1403         ops[0].cls.argc = 0;
1404         ops[0].cls.indata = data;
1405         ops[0].cls.indata_len = len;
1406
1407         ret = rbd_req_sync_op(dev, NULL,
1408                                CEPH_NOSNAP,
1409                                0,
1410                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1411                                ops,
1412                                1, obj, 0, 0, NULL, NULL, ver);
1413
1414         rbd_destroy_ops(ops);
1415
1416         dout("cls_exec returned %d\n", ret);
1417         return ret;
1418 }
1419
1420 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1421 {
1422         struct rbd_req_coll *coll =
1423                         kzalloc(sizeof(struct rbd_req_coll) +
1424                                 sizeof(struct rbd_req_status) * num_reqs,
1425                                 GFP_ATOMIC);
1426
1427         if (!coll)
1428                 return NULL;
1429         coll->total = num_reqs;
1430         kref_init(&coll->kref);
1431         return coll;
1432 }
1433
1434 /*
1435  * block device queue callback
1436  */
1437 static void rbd_rq_fn(struct request_queue *q)
1438 {
1439         struct rbd_device *rbd_dev = q->queuedata;
1440         struct request *rq;
1441         struct bio_pair *bp = NULL;
1442
1443         while ((rq = blk_fetch_request(q))) {
1444                 struct bio *bio;
1445                 struct bio *rq_bio, *next_bio = NULL;
1446                 bool do_write;
1447                 int size, op_size = 0;
1448                 u64 ofs;
1449                 int num_segs, cur_seg = 0;
1450                 struct rbd_req_coll *coll;
1451
1452                 /* peek at request from block layer */
1453                 if (!rq)
1454                         break;
1455
1456                 dout("fetched request\n");
1457
1458                 /* filter out block requests we don't understand */
1459                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1460                         __blk_end_request_all(rq, 0);
1461                         continue;
1462                 }
1463
1464                 /* deduce our operation (read, write) */
1465                 do_write = (rq_data_dir(rq) == WRITE);
1466
1467                 size = blk_rq_bytes(rq);
1468                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1469                 rq_bio = rq->bio;
1470                 if (do_write && rbd_dev->read_only) {
1471                         __blk_end_request_all(rq, -EROFS);
1472                         continue;
1473                 }
1474
1475                 spin_unlock_irq(q->queue_lock);
1476
1477                 dout("%s 0x%x bytes at 0x%llx\n",
1478                      do_write ? "write" : "read",
1479                      size, blk_rq_pos(rq) * SECTOR_SIZE);
1480
1481                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1482                 coll = rbd_alloc_coll(num_segs);
1483                 if (!coll) {
1484                         spin_lock_irq(q->queue_lock);
1485                         __blk_end_request_all(rq, -ENOMEM);
1486                         continue;
1487                 }
1488
1489                 do {
1490                         /* a bio clone to be passed down to OSD req */
1491                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1492                         op_size = rbd_get_segment(&rbd_dev->header,
1493                                                   rbd_dev->header.object_prefix,
1494                                                   ofs, size,
1495                                                   NULL, NULL);
1496                         kref_get(&coll->kref);
1497                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1498                                               op_size, GFP_ATOMIC);
1499                         if (!bio) {
1500                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1501                                                        -ENOMEM, op_size);
1502                                 goto next_seg;
1503                         }
1504
1505
1506                         /* init OSD command: write or read */
1507                         if (do_write)
1508                                 rbd_req_write(rq, rbd_dev,
1509                                               rbd_dev->header.snapc,
1510                                               ofs,
1511                                               op_size, bio,
1512                                               coll, cur_seg);
1513                         else
1514                                 rbd_req_read(rq, rbd_dev,
1515                                              rbd_dev->snap_id,
1516                                              ofs,
1517                                              op_size, bio,
1518                                              coll, cur_seg);
1519
1520 next_seg:
1521                         size -= op_size;
1522                         ofs += op_size;
1523
1524                         cur_seg++;
1525                         rq_bio = next_bio;
1526                 } while (size > 0);
1527                 kref_put(&coll->kref, rbd_coll_release);
1528
1529                 if (bp)
1530                         bio_pair_release(bp);
1531                 spin_lock_irq(q->queue_lock);
1532         }
1533 }
1534
1535 /*
1536  * a queue callback. Makes sure that we don't create a bio that spans across
1537  * multiple osd objects. One exception would be with a single page bios,
1538  * which we handle later at bio_chain_clone
1539  */
1540 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1541                           struct bio_vec *bvec)
1542 {
1543         struct rbd_device *rbd_dev = q->queuedata;
1544         unsigned int chunk_sectors;
1545         sector_t sector;
1546         unsigned int bio_sectors;
1547         int max;
1548
1549         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1550         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1551         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1552
1553         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1554                                  + bio_sectors)) << SECTOR_SHIFT;
1555         if (max < 0)
1556                 max = 0; /* bio_add cannot handle a negative return */
1557         if (max <= bvec->bv_len && bio_sectors == 0)
1558                 return bvec->bv_len;
1559         return max;
1560 }
1561
1562 static void rbd_free_disk(struct rbd_device *rbd_dev)
1563 {
1564         struct gendisk *disk = rbd_dev->disk;
1565
1566         if (!disk)
1567                 return;
1568
1569         rbd_header_free(&rbd_dev->header);
1570
1571         if (disk->flags & GENHD_FL_UP)
1572                 del_gendisk(disk);
1573         if (disk->queue)
1574                 blk_cleanup_queue(disk->queue);
1575         put_disk(disk);
1576 }
1577
1578 /*
1579  * reload the ondisk the header 
1580  */
1581 static int rbd_read_header(struct rbd_device *rbd_dev,
1582                            struct rbd_image_header *header)
1583 {
1584         ssize_t rc;
1585         struct rbd_image_header_ondisk *dh;
1586         u32 snap_count = 0;
1587         u64 ver;
1588         size_t len;
1589
1590         /*
1591          * First reads the fixed-size header to determine the number
1592          * of snapshots, then re-reads it, along with all snapshot
1593          * records as well as their stored names.
1594          */
1595         len = sizeof (*dh);
1596         while (1) {
1597                 dh = kmalloc(len, GFP_KERNEL);
1598                 if (!dh)
1599                         return -ENOMEM;
1600
1601                 rc = rbd_req_sync_read(rbd_dev,
1602                                        NULL, CEPH_NOSNAP,
1603                                        rbd_dev->obj_md_name,
1604                                        0, len,
1605                                        (char *)dh, &ver);
1606                 if (rc < 0)
1607                         goto out_dh;
1608
1609                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1610                 if (rc < 0) {
1611                         if (rc == -ENXIO)
1612                                 pr_warning("unrecognized header format"
1613                                            " for image %s", rbd_dev->obj);
1614                         goto out_dh;
1615                 }
1616
1617                 if (snap_count == header->total_snaps)
1618                         break;
1619
1620                 snap_count = header->total_snaps;
1621                 len = sizeof (*dh) +
1622                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1623                         header->snap_names_len;
1624
1625                 rbd_header_free(header);
1626                 kfree(dh);
1627         }
1628         header->obj_version = ver;
1629
1630 out_dh:
1631         kfree(dh);
1632         return rc;
1633 }
1634
1635 /*
1636  * create a snapshot
1637  */
1638 static int rbd_header_add_snap(struct rbd_device *dev,
1639                                const char *snap_name,
1640                                gfp_t gfp_flags)
1641 {
1642         int name_len = strlen(snap_name);
1643         u64 new_snapid;
1644         int ret;
1645         void *data, *p, *e;
1646         u64 ver;
1647         struct ceph_mon_client *monc;
1648
1649         /* we should create a snapshot only if we're pointing at the head */
1650         if (dev->snap_id != CEPH_NOSNAP)
1651                 return -EINVAL;
1652
1653         monc = &dev->rbd_client->client->monc;
1654         ret = ceph_monc_create_snapid(monc, dev->pool_id, &new_snapid);
1655         dout("created snapid=%lld\n", new_snapid);
1656         if (ret < 0)
1657                 return ret;
1658
1659         data = kmalloc(name_len + 16, gfp_flags);
1660         if (!data)
1661                 return -ENOMEM;
1662
1663         p = data;
1664         e = data + name_len + 16;
1665
1666         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1667         ceph_encode_64_safe(&p, e, new_snapid, bad);
1668
1669         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1670                                 data, p - data, &ver);
1671
1672         kfree(data);
1673
1674         if (ret < 0)
1675                 return ret;
1676
1677         down_write(&dev->header_rwsem);
1678         dev->header.snapc->seq = new_snapid;
1679         up_write(&dev->header_rwsem);
1680
1681         return 0;
1682 bad:
1683         return -ERANGE;
1684 }
1685
1686 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1687 {
1688         struct rbd_snap *snap;
1689
1690         while (!list_empty(&rbd_dev->snaps)) {
1691                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1692                 __rbd_remove_snap_dev(rbd_dev, snap);
1693         }
1694 }
1695
1696 /*
1697  * only read the first part of the ondisk header, without the snaps info
1698  */
1699 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1700 {
1701         int ret;
1702         struct rbd_image_header h;
1703         u64 snap_seq;
1704         int follow_seq = 0;
1705
1706         ret = rbd_read_header(rbd_dev, &h);
1707         if (ret < 0)
1708                 return ret;
1709
1710         /* resized? */
1711         set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1712
1713         down_write(&rbd_dev->header_rwsem);
1714
1715         snap_seq = rbd_dev->header.snapc->seq;
1716         if (rbd_dev->header.total_snaps &&
1717             rbd_dev->header.snapc->snaps[0] == snap_seq)
1718                 /* pointing at the head, will need to follow that
1719                    if head moves */
1720                 follow_seq = 1;
1721
1722         /* rbd_dev->header.object_prefix shouldn't change */
1723         kfree(rbd_dev->header.snap_sizes);
1724         kfree(rbd_dev->header.snap_names);
1725         kfree(rbd_dev->header.snapc);
1726
1727         rbd_dev->header.total_snaps = h.total_snaps;
1728         rbd_dev->header.snapc = h.snapc;
1729         rbd_dev->header.snap_names = h.snap_names;
1730         rbd_dev->header.snap_names_len = h.snap_names_len;
1731         rbd_dev->header.snap_sizes = h.snap_sizes;
1732         /* Free the extra copy of the object prefix */
1733         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1734         kfree(h.object_prefix);
1735
1736         if (follow_seq)
1737                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1738         else
1739                 rbd_dev->header.snapc->seq = snap_seq;
1740
1741         ret = __rbd_init_snaps_header(rbd_dev);
1742
1743         up_write(&rbd_dev->header_rwsem);
1744
1745         return ret;
1746 }
1747
1748 static int rbd_init_disk(struct rbd_device *rbd_dev)
1749 {
1750         struct gendisk *disk;
1751         struct request_queue *q;
1752         int rc;
1753         u64 segment_size;
1754         u64 total_size = 0;
1755
1756         /* contact OSD, request size info about the object being mapped */
1757         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1758         if (rc)
1759                 return rc;
1760
1761         /* no need to lock here, as rbd_dev is not registered yet */
1762         rc = __rbd_init_snaps_header(rbd_dev);
1763         if (rc)
1764                 return rc;
1765
1766         rc = rbd_header_set_snap(rbd_dev, &total_size);
1767         if (rc)
1768                 return rc;
1769
1770         /* create gendisk info */
1771         rc = -ENOMEM;
1772         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1773         if (!disk)
1774                 goto out;
1775
1776         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1777                  rbd_dev->id);
1778         disk->major = rbd_dev->major;
1779         disk->first_minor = 0;
1780         disk->fops = &rbd_bd_ops;
1781         disk->private_data = rbd_dev;
1782
1783         /* init rq */
1784         rc = -ENOMEM;
1785         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1786         if (!q)
1787                 goto out_disk;
1788
1789         /* We use the default size, but let's be explicit about it. */
1790         blk_queue_physical_block_size(q, SECTOR_SIZE);
1791
1792         /* set io sizes to object size */
1793         segment_size = rbd_obj_bytes(&rbd_dev->header);
1794         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1795         blk_queue_max_segment_size(q, segment_size);
1796         blk_queue_io_min(q, segment_size);
1797         blk_queue_io_opt(q, segment_size);
1798
1799         blk_queue_merge_bvec(q, rbd_merge_bvec);
1800         disk->queue = q;
1801
1802         q->queuedata = rbd_dev;
1803
1804         rbd_dev->disk = disk;
1805         rbd_dev->q = q;
1806
1807         /* finally, announce the disk to the world */
1808         set_capacity(disk, total_size / SECTOR_SIZE);
1809         add_disk(disk);
1810
1811         pr_info("%s: added with size 0x%llx\n",
1812                 disk->disk_name, (unsigned long long)total_size);
1813         return 0;
1814
1815 out_disk:
1816         put_disk(disk);
1817 out:
1818         return rc;
1819 }
1820
1821 /*
1822   sysfs
1823 */
1824
1825 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1826 {
1827         return container_of(dev, struct rbd_device, dev);
1828 }
1829
1830 static ssize_t rbd_size_show(struct device *dev,
1831                              struct device_attribute *attr, char *buf)
1832 {
1833         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1834
1835         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1836 }
1837
1838 static ssize_t rbd_major_show(struct device *dev,
1839                               struct device_attribute *attr, char *buf)
1840 {
1841         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1842
1843         return sprintf(buf, "%d\n", rbd_dev->major);
1844 }
1845
1846 static ssize_t rbd_client_id_show(struct device *dev,
1847                                   struct device_attribute *attr, char *buf)
1848 {
1849         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1850
1851         return sprintf(buf, "client%lld\n",
1852                         ceph_client_id(rbd_dev->rbd_client->client));
1853 }
1854
1855 static ssize_t rbd_pool_show(struct device *dev,
1856                              struct device_attribute *attr, char *buf)
1857 {
1858         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1859
1860         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1861 }
1862
1863 static ssize_t rbd_pool_id_show(struct device *dev,
1864                              struct device_attribute *attr, char *buf)
1865 {
1866         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1867
1868         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1869 }
1870
1871 static ssize_t rbd_name_show(struct device *dev,
1872                              struct device_attribute *attr, char *buf)
1873 {
1874         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1875
1876         return sprintf(buf, "%s\n", rbd_dev->obj);
1877 }
1878
1879 static ssize_t rbd_snap_show(struct device *dev,
1880                              struct device_attribute *attr,
1881                              char *buf)
1882 {
1883         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1884
1885         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1886 }
1887
1888 static ssize_t rbd_image_refresh(struct device *dev,
1889                                  struct device_attribute *attr,
1890                                  const char *buf,
1891                                  size_t size)
1892 {
1893         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1894         int rc;
1895         int ret = size;
1896
1897         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1898
1899         rc = __rbd_refresh_header(rbd_dev);
1900         if (rc < 0)
1901                 ret = rc;
1902
1903         mutex_unlock(&ctl_mutex);
1904         return ret;
1905 }
1906
1907 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1908 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1909 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1910 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1911 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1912 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1913 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1914 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1915 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1916
1917 static struct attribute *rbd_attrs[] = {
1918         &dev_attr_size.attr,
1919         &dev_attr_major.attr,
1920         &dev_attr_client_id.attr,
1921         &dev_attr_pool.attr,
1922         &dev_attr_pool_id.attr,
1923         &dev_attr_name.attr,
1924         &dev_attr_current_snap.attr,
1925         &dev_attr_refresh.attr,
1926         &dev_attr_create_snap.attr,
1927         NULL
1928 };
1929
1930 static struct attribute_group rbd_attr_group = {
1931         .attrs = rbd_attrs,
1932 };
1933
1934 static const struct attribute_group *rbd_attr_groups[] = {
1935         &rbd_attr_group,
1936         NULL
1937 };
1938
1939 static void rbd_sysfs_dev_release(struct device *dev)
1940 {
1941 }
1942
1943 static struct device_type rbd_device_type = {
1944         .name           = "rbd",
1945         .groups         = rbd_attr_groups,
1946         .release        = rbd_sysfs_dev_release,
1947 };
1948
1949
1950 /*
1951   sysfs - snapshots
1952 */
1953
1954 static ssize_t rbd_snap_size_show(struct device *dev,
1955                                   struct device_attribute *attr,
1956                                   char *buf)
1957 {
1958         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1959
1960         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1961 }
1962
1963 static ssize_t rbd_snap_id_show(struct device *dev,
1964                                 struct device_attribute *attr,
1965                                 char *buf)
1966 {
1967         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1968
1969         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1970 }
1971
1972 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1973 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1974
1975 static struct attribute *rbd_snap_attrs[] = {
1976         &dev_attr_snap_size.attr,
1977         &dev_attr_snap_id.attr,
1978         NULL,
1979 };
1980
1981 static struct attribute_group rbd_snap_attr_group = {
1982         .attrs = rbd_snap_attrs,
1983 };
1984
1985 static void rbd_snap_dev_release(struct device *dev)
1986 {
1987         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1988         kfree(snap->name);
1989         kfree(snap);
1990 }
1991
1992 static const struct attribute_group *rbd_snap_attr_groups[] = {
1993         &rbd_snap_attr_group,
1994         NULL
1995 };
1996
1997 static struct device_type rbd_snap_device_type = {
1998         .groups         = rbd_snap_attr_groups,
1999         .release        = rbd_snap_dev_release,
2000 };
2001
2002 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2003                                   struct rbd_snap *snap)
2004 {
2005         list_del(&snap->node);
2006         device_unregister(&snap->dev);
2007 }
2008
2009 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2010                                   struct rbd_snap *snap,
2011                                   struct device *parent)
2012 {
2013         struct device *dev = &snap->dev;
2014         int ret;
2015
2016         dev->type = &rbd_snap_device_type;
2017         dev->parent = parent;
2018         dev->release = rbd_snap_dev_release;
2019         dev_set_name(dev, "snap_%s", snap->name);
2020         ret = device_register(dev);
2021
2022         return ret;
2023 }
2024
2025 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2026                               int i, const char *name,
2027                               struct rbd_snap **snapp)
2028 {
2029         int ret;
2030         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2031         if (!snap)
2032                 return -ENOMEM;
2033         snap->name = kstrdup(name, GFP_KERNEL);
2034         snap->size = rbd_dev->header.snap_sizes[i];
2035         snap->id = rbd_dev->header.snapc->snaps[i];
2036         if (device_is_registered(&rbd_dev->dev)) {
2037                 ret = rbd_register_snap_dev(rbd_dev, snap,
2038                                              &rbd_dev->dev);
2039                 if (ret < 0)
2040                         goto err;
2041         }
2042         *snapp = snap;
2043         return 0;
2044 err:
2045         kfree(snap->name);
2046         kfree(snap);
2047         return ret;
2048 }
2049
2050 /*
2051  * search for the previous snap in a null delimited string list
2052  */
2053 const char *rbd_prev_snap_name(const char *name, const char *start)
2054 {
2055         if (name < start + 2)
2056                 return NULL;
2057
2058         name -= 2;
2059         while (*name) {
2060                 if (name == start)
2061                         return start;
2062                 name--;
2063         }
2064         return name + 1;
2065 }
2066
2067 /*
2068  * compare the old list of snapshots that we have to what's in the header
2069  * and update it accordingly. Note that the header holds the snapshots
2070  * in a reverse order (from newest to oldest) and we need to go from
2071  * older to new so that we don't get a duplicate snap name when
2072  * doing the process (e.g., removed snapshot and recreated a new
2073  * one with the same name.
2074  */
2075 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2076 {
2077         const char *name, *first_name;
2078         int i = rbd_dev->header.total_snaps;
2079         struct rbd_snap *snap, *old_snap = NULL;
2080         int ret;
2081         struct list_head *p, *n;
2082
2083         first_name = rbd_dev->header.snap_names;
2084         name = first_name + rbd_dev->header.snap_names_len;
2085
2086         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2087                 u64 cur_id;
2088
2089                 old_snap = list_entry(p, struct rbd_snap, node);
2090
2091                 if (i)
2092                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2093
2094                 if (!i || old_snap->id < cur_id) {
2095                         /* old_snap->id was skipped, thus was removed */
2096                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2097                         continue;
2098                 }
2099                 if (old_snap->id == cur_id) {
2100                         /* we have this snapshot already */
2101                         i--;
2102                         name = rbd_prev_snap_name(name, first_name);
2103                         continue;
2104                 }
2105                 for (; i > 0;
2106                      i--, name = rbd_prev_snap_name(name, first_name)) {
2107                         if (!name) {
2108                                 WARN_ON(1);
2109                                 return -EINVAL;
2110                         }
2111                         cur_id = rbd_dev->header.snapc->snaps[i];
2112                         /* snapshot removal? handle it above */
2113                         if (cur_id >= old_snap->id)
2114                                 break;
2115                         /* a new snapshot */
2116                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2117                         if (ret < 0)
2118                                 return ret;
2119
2120                         /* note that we add it backward so using n and not p */
2121                         list_add(&snap->node, n);
2122                         p = &snap->node;
2123                 }
2124         }
2125         /* we're done going over the old snap list, just add what's left */
2126         for (; i > 0; i--) {
2127                 name = rbd_prev_snap_name(name, first_name);
2128                 if (!name) {
2129                         WARN_ON(1);
2130                         return -EINVAL;
2131                 }
2132                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2133                 if (ret < 0)
2134                         return ret;
2135                 list_add(&snap->node, &rbd_dev->snaps);
2136         }
2137
2138         return 0;
2139 }
2140
2141 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2142 {
2143         int ret;
2144         struct device *dev;
2145         struct rbd_snap *snap;
2146
2147         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2148         dev = &rbd_dev->dev;
2149
2150         dev->bus = &rbd_bus_type;
2151         dev->type = &rbd_device_type;
2152         dev->parent = &rbd_root_dev;
2153         dev->release = rbd_dev_release;
2154         dev_set_name(dev, "%d", rbd_dev->id);
2155         ret = device_register(dev);
2156         if (ret < 0)
2157                 goto out;
2158
2159         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2160                 ret = rbd_register_snap_dev(rbd_dev, snap,
2161                                              &rbd_dev->dev);
2162                 if (ret < 0)
2163                         break;
2164         }
2165 out:
2166         mutex_unlock(&ctl_mutex);
2167         return ret;
2168 }
2169
2170 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2171 {
2172         device_unregister(&rbd_dev->dev);
2173 }
2174
2175 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2176 {
2177         int ret, rc;
2178
2179         do {
2180                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2181                                          rbd_dev->header.obj_version);
2182                 if (ret == -ERANGE) {
2183                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2184                         rc = __rbd_refresh_header(rbd_dev);
2185                         mutex_unlock(&ctl_mutex);
2186                         if (rc < 0)
2187                                 return rc;
2188                 }
2189         } while (ret == -ERANGE);
2190
2191         return ret;
2192 }
2193
2194 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2195
2196 /*
2197  * Get a unique rbd identifier for the given new rbd_dev, and add
2198  * the rbd_dev to the global list.  The minimum rbd id is 1.
2199  */
2200 static void rbd_id_get(struct rbd_device *rbd_dev)
2201 {
2202         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2203
2204         spin_lock(&rbd_dev_list_lock);
2205         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2206         spin_unlock(&rbd_dev_list_lock);
2207 }
2208
2209 /*
2210  * Remove an rbd_dev from the global list, and record that its
2211  * identifier is no longer in use.
2212  */
2213 static void rbd_id_put(struct rbd_device *rbd_dev)
2214 {
2215         struct list_head *tmp;
2216         int rbd_id = rbd_dev->id;
2217         int max_id;
2218
2219         BUG_ON(rbd_id < 1);
2220
2221         spin_lock(&rbd_dev_list_lock);
2222         list_del_init(&rbd_dev->node);
2223
2224         /*
2225          * If the id being "put" is not the current maximum, there
2226          * is nothing special we need to do.
2227          */
2228         if (rbd_id != atomic64_read(&rbd_id_max)) {
2229                 spin_unlock(&rbd_dev_list_lock);
2230                 return;
2231         }
2232
2233         /*
2234          * We need to update the current maximum id.  Search the
2235          * list to find out what it is.  We're more likely to find
2236          * the maximum at the end, so search the list backward.
2237          */
2238         max_id = 0;
2239         list_for_each_prev(tmp, &rbd_dev_list) {
2240                 struct rbd_device *rbd_dev;
2241
2242                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2243                 if (rbd_id > max_id)
2244                         max_id = rbd_id;
2245         }
2246         spin_unlock(&rbd_dev_list_lock);
2247
2248         /*
2249          * The max id could have been updated by rbd_id_get(), in
2250          * which case it now accurately reflects the new maximum.
2251          * Be careful not to overwrite the maximum value in that
2252          * case.
2253          */
2254         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2255 }
2256
2257 /*
2258  * Skips over white space at *buf, and updates *buf to point to the
2259  * first found non-space character (if any). Returns the length of
2260  * the token (string of non-white space characters) found.  Note
2261  * that *buf must be terminated with '\0'.
2262  */
2263 static inline size_t next_token(const char **buf)
2264 {
2265         /*
2266         * These are the characters that produce nonzero for
2267         * isspace() in the "C" and "POSIX" locales.
2268         */
2269         const char *spaces = " \f\n\r\t\v";
2270
2271         *buf += strspn(*buf, spaces);   /* Find start of token */
2272
2273         return strcspn(*buf, spaces);   /* Return token length */
2274 }
2275
2276 /*
2277  * Finds the next token in *buf, and if the provided token buffer is
2278  * big enough, copies the found token into it.  The result, if
2279  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2280  * must be terminated with '\0' on entry.
2281  *
2282  * Returns the length of the token found (not including the '\0').
2283  * Return value will be 0 if no token is found, and it will be >=
2284  * token_size if the token would not fit.
2285  *
2286  * The *buf pointer will be updated to point beyond the end of the
2287  * found token.  Note that this occurs even if the token buffer is
2288  * too small to hold it.
2289  */
2290 static inline size_t copy_token(const char **buf,
2291                                 char *token,
2292                                 size_t token_size)
2293 {
2294         size_t len;
2295
2296         len = next_token(buf);
2297         if (len < token_size) {
2298                 memcpy(token, *buf, len);
2299                 *(token + len) = '\0';
2300         }
2301         *buf += len;
2302
2303         return len;
2304 }
2305
2306 /*
2307  * Finds the next token in *buf, dynamically allocates a buffer big
2308  * enough to hold a copy of it, and copies the token into the new
2309  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2310  * that a duplicate buffer is created even for a zero-length token.
2311  *
2312  * Returns a pointer to the newly-allocated duplicate, or a null
2313  * pointer if memory for the duplicate was not available.  If
2314  * the lenp argument is a non-null pointer, the length of the token
2315  * (not including the '\0') is returned in *lenp.
2316  *
2317  * If successful, the *buf pointer will be updated to point beyond
2318  * the end of the found token.
2319  *
2320  * Note: uses GFP_KERNEL for allocation.
2321  */
2322 static inline char *dup_token(const char **buf, size_t *lenp)
2323 {
2324         char *dup;
2325         size_t len;
2326
2327         len = next_token(buf);
2328         dup = kmalloc(len + 1, GFP_KERNEL);
2329         if (!dup)
2330                 return NULL;
2331
2332         memcpy(dup, *buf, len);
2333         *(dup + len) = '\0';
2334         *buf += len;
2335
2336         if (lenp)
2337                 *lenp = len;
2338
2339         return dup;
2340 }
2341
2342 /*
2343  * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2344  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2345  * on the list of monitor addresses and other options provided via
2346  * /sys/bus/rbd/add.
2347  *
2348  * Note: rbd_dev is assumed to have been initially zero-filled.
2349  */
2350 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2351                               const char *buf,
2352                               const char **mon_addrs,
2353                               size_t *mon_addrs_size,
2354                               char *options,
2355                               size_t options_size)
2356 {
2357         size_t len;
2358         int ret;
2359
2360         /* The first four tokens are required */
2361
2362         len = next_token(&buf);
2363         if (!len)
2364                 return -EINVAL;
2365         *mon_addrs_size = len + 1;
2366         *mon_addrs = buf;
2367
2368         buf += len;
2369
2370         len = copy_token(&buf, options, options_size);
2371         if (!len || len >= options_size)
2372                 return -EINVAL;
2373
2374         ret = -ENOMEM;
2375         rbd_dev->pool_name = dup_token(&buf, NULL);
2376         if (!rbd_dev->pool_name)
2377                 goto out_err;
2378
2379         rbd_dev->obj = dup_token(&buf, &rbd_dev->obj_len);
2380         if (!rbd_dev->obj)
2381                 goto out_err;
2382
2383         /* Create the name of the header object */
2384
2385         rbd_dev->obj_md_name = kmalloc(rbd_dev->obj_len
2386                                                 + sizeof (RBD_SUFFIX),
2387                                         GFP_KERNEL);
2388         if (!rbd_dev->obj_md_name)
2389                 goto out_err;
2390         sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2391
2392         /*
2393          * The snapshot name is optional, but it's an error if it's
2394          * too long.  If no snapshot is supplied, fill in the default.
2395          */
2396         len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2397         if (!len)
2398                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2399                         sizeof (RBD_SNAP_HEAD_NAME));
2400         else if (len >= sizeof (rbd_dev->snap_name)) {
2401                 ret = -EINVAL;
2402                 goto out_err;
2403         }
2404
2405         return 0;
2406
2407 out_err:
2408         kfree(rbd_dev->obj_md_name);
2409         kfree(rbd_dev->obj);
2410         kfree(rbd_dev->pool_name);
2411         rbd_dev->pool_name = NULL;
2412
2413         return ret;
2414 }
2415
2416 static ssize_t rbd_add(struct bus_type *bus,
2417                        const char *buf,
2418                        size_t count)
2419 {
2420         char *options;
2421         struct rbd_device *rbd_dev = NULL;
2422         const char *mon_addrs = NULL;
2423         size_t mon_addrs_size = 0;
2424         struct ceph_osd_client *osdc;
2425         int rc = -ENOMEM;
2426
2427         if (!try_module_get(THIS_MODULE))
2428                 return -ENODEV;
2429
2430         options = kmalloc(count, GFP_KERNEL);
2431         if (!options)
2432                 goto err_nomem;
2433         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2434         if (!rbd_dev)
2435                 goto err_nomem;
2436
2437         /* static rbd_device initialization */
2438         spin_lock_init(&rbd_dev->lock);
2439         INIT_LIST_HEAD(&rbd_dev->node);
2440         INIT_LIST_HEAD(&rbd_dev->snaps);
2441         init_rwsem(&rbd_dev->header_rwsem);
2442
2443         init_rwsem(&rbd_dev->header_rwsem);
2444
2445         /* generate unique id: find highest unique id, add one */
2446         rbd_id_get(rbd_dev);
2447
2448         /* Fill in the device name, now that we have its id. */
2449         BUILD_BUG_ON(DEV_NAME_LEN
2450                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2451         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2452
2453         /* parse add command */
2454         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2455                                 options, count);
2456         if (rc)
2457                 goto err_put_id;
2458
2459         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2460                                                 options);
2461         if (IS_ERR(rbd_dev->rbd_client)) {
2462                 rc = PTR_ERR(rbd_dev->rbd_client);
2463                 goto err_put_id;
2464         }
2465
2466         /* pick the pool */
2467         osdc = &rbd_dev->rbd_client->client->osdc;
2468         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2469         if (rc < 0)
2470                 goto err_out_client;
2471         rbd_dev->pool_id = rc;
2472
2473         /* register our block device */
2474         rc = register_blkdev(0, rbd_dev->name);
2475         if (rc < 0)
2476                 goto err_out_client;
2477         rbd_dev->major = rc;
2478
2479         rc = rbd_bus_add_dev(rbd_dev);
2480         if (rc)
2481                 goto err_out_blkdev;
2482
2483         /*
2484          * At this point cleanup in the event of an error is the job
2485          * of the sysfs code (initiated by rbd_bus_del_dev()).
2486          *
2487          * Set up and announce blkdev mapping.
2488          */
2489         rc = rbd_init_disk(rbd_dev);
2490         if (rc)
2491                 goto err_out_bus;
2492
2493         rc = rbd_init_watch_dev(rbd_dev);
2494         if (rc)
2495                 goto err_out_bus;
2496
2497         return count;
2498
2499 err_out_bus:
2500         /* this will also clean up rest of rbd_dev stuff */
2501
2502         rbd_bus_del_dev(rbd_dev);
2503         kfree(options);
2504         return rc;
2505
2506 err_out_blkdev:
2507         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2508 err_out_client:
2509         rbd_put_client(rbd_dev);
2510 err_put_id:
2511         if (rbd_dev->pool_name) {
2512                 kfree(rbd_dev->obj_md_name);
2513                 kfree(rbd_dev->obj);
2514                 kfree(rbd_dev->pool_name);
2515         }
2516         rbd_id_put(rbd_dev);
2517 err_nomem:
2518         kfree(rbd_dev);
2519         kfree(options);
2520
2521         dout("Error adding device %s\n", buf);
2522         module_put(THIS_MODULE);
2523
2524         return (ssize_t) rc;
2525 }
2526
2527 static struct rbd_device *__rbd_get_dev(unsigned long id)
2528 {
2529         struct list_head *tmp;
2530         struct rbd_device *rbd_dev;
2531
2532         spin_lock(&rbd_dev_list_lock);
2533         list_for_each(tmp, &rbd_dev_list) {
2534                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2535                 if (rbd_dev->id == id) {
2536                         spin_unlock(&rbd_dev_list_lock);
2537                         return rbd_dev;
2538                 }
2539         }
2540         spin_unlock(&rbd_dev_list_lock);
2541         return NULL;
2542 }
2543
2544 static void rbd_dev_release(struct device *dev)
2545 {
2546         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2547
2548         if (rbd_dev->watch_request) {
2549                 struct ceph_client *client = rbd_dev->rbd_client->client;
2550
2551                 ceph_osdc_unregister_linger_request(&client->osdc,
2552                                                     rbd_dev->watch_request);
2553         }
2554         if (rbd_dev->watch_event)
2555                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2556
2557         rbd_put_client(rbd_dev);
2558
2559         /* clean up and free blkdev */
2560         rbd_free_disk(rbd_dev);
2561         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2562
2563         /* done with the id, and with the rbd_dev */
2564         kfree(rbd_dev->obj_md_name);
2565         kfree(rbd_dev->pool_name);
2566         kfree(rbd_dev->obj);
2567         rbd_id_put(rbd_dev);
2568         kfree(rbd_dev);
2569
2570         /* release module ref */
2571         module_put(THIS_MODULE);
2572 }
2573
2574 static ssize_t rbd_remove(struct bus_type *bus,
2575                           const char *buf,
2576                           size_t count)
2577 {
2578         struct rbd_device *rbd_dev = NULL;
2579         int target_id, rc;
2580         unsigned long ul;
2581         int ret = count;
2582
2583         rc = strict_strtoul(buf, 10, &ul);
2584         if (rc)
2585                 return rc;
2586
2587         /* convert to int; abort if we lost anything in the conversion */
2588         target_id = (int) ul;
2589         if (target_id != ul)
2590                 return -EINVAL;
2591
2592         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2593
2594         rbd_dev = __rbd_get_dev(target_id);
2595         if (!rbd_dev) {
2596                 ret = -ENOENT;
2597                 goto done;
2598         }
2599
2600         __rbd_remove_all_snaps(rbd_dev);
2601         rbd_bus_del_dev(rbd_dev);
2602
2603 done:
2604         mutex_unlock(&ctl_mutex);
2605         return ret;
2606 }
2607
2608 static ssize_t rbd_snap_add(struct device *dev,
2609                             struct device_attribute *attr,
2610                             const char *buf,
2611                             size_t count)
2612 {
2613         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2614         int ret;
2615         char *name = kmalloc(count + 1, GFP_KERNEL);
2616         if (!name)
2617                 return -ENOMEM;
2618
2619         snprintf(name, count, "%s", buf);
2620
2621         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2622
2623         ret = rbd_header_add_snap(rbd_dev,
2624                                   name, GFP_KERNEL);
2625         if (ret < 0)
2626                 goto err_unlock;
2627
2628         ret = __rbd_refresh_header(rbd_dev);
2629         if (ret < 0)
2630                 goto err_unlock;
2631
2632         /* shouldn't hold ctl_mutex when notifying.. notify might
2633            trigger a watch callback that would need to get that mutex */
2634         mutex_unlock(&ctl_mutex);
2635
2636         /* make a best effort, don't error if failed */
2637         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2638
2639         ret = count;
2640         kfree(name);
2641         return ret;
2642
2643 err_unlock:
2644         mutex_unlock(&ctl_mutex);
2645         kfree(name);
2646         return ret;
2647 }
2648
2649 /*
2650  * create control files in sysfs
2651  * /sys/bus/rbd/...
2652  */
2653 static int rbd_sysfs_init(void)
2654 {
2655         int ret;
2656
2657         ret = device_register(&rbd_root_dev);
2658         if (ret < 0)
2659                 return ret;
2660
2661         ret = bus_register(&rbd_bus_type);
2662         if (ret < 0)
2663                 device_unregister(&rbd_root_dev);
2664
2665         return ret;
2666 }
2667
2668 static void rbd_sysfs_cleanup(void)
2669 {
2670         bus_unregister(&rbd_bus_type);
2671         device_unregister(&rbd_root_dev);
2672 }
2673
2674 int __init rbd_init(void)
2675 {
2676         int rc;
2677
2678         rc = rbd_sysfs_init();
2679         if (rc)
2680                 return rc;
2681         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2682         return 0;
2683 }
2684
2685 void __exit rbd_exit(void)
2686 {
2687         rbd_sysfs_cleanup();
2688 }
2689
2690 module_init(rbd_init);
2691 module_exit(rbd_exit);
2692
2693 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2694 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2695 MODULE_DESCRIPTION("rados block device");
2696
2697 /* following authorship retained from original osdblk.c */
2698 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2699
2700 MODULE_LICENSE("GPL");