]> git.karo-electronics.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
rbd: pass null version pointer in add_snap()
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         rbd_get_dev(rbd_dev);
250
251         set_device_ro(bdev, rbd_dev->read_only);
252
253         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254                 return -EROFS;
255
256         return 0;
257 }
258
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261         struct rbd_device *rbd_dev = disk->private_data;
262
263         rbd_put_dev(rbd_dev);
264
265         return 0;
266 }
267
268 static const struct block_device_operations rbd_bd_ops = {
269         .owner                  = THIS_MODULE,
270         .open                   = rbd_open,
271         .release                = rbd_release,
272 };
273
274 /*
275  * Initialize an rbd client instance.
276  * We own *ceph_opts.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279                                             struct rbd_options *rbd_opts)
280 {
281         struct rbd_client *rbdc;
282         int ret = -ENOMEM;
283
284         dout("rbd_client_create\n");
285         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286         if (!rbdc)
287                 goto out_opt;
288
289         kref_init(&rbdc->kref);
290         INIT_LIST_HEAD(&rbdc->node);
291
292         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293
294         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295         if (IS_ERR(rbdc->client))
296                 goto out_mutex;
297         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298
299         ret = ceph_open_session(rbdc->client);
300         if (ret < 0)
301                 goto out_err;
302
303         rbdc->rbd_opts = rbd_opts;
304
305         spin_lock(&rbd_client_list_lock);
306         list_add_tail(&rbdc->node, &rbd_client_list);
307         spin_unlock(&rbd_client_list_lock);
308
309         mutex_unlock(&ctl_mutex);
310
311         dout("rbd_client_create created %p\n", rbdc);
312         return rbdc;
313
314 out_err:
315         ceph_destroy_client(rbdc->client);
316 out_mutex:
317         mutex_unlock(&ctl_mutex);
318         kfree(rbdc);
319 out_opt:
320         if (ceph_opts)
321                 ceph_destroy_options(ceph_opts);
322         return ERR_PTR(ret);
323 }
324
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 {
330         struct rbd_client *client_node;
331
332         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333                 return NULL;
334
335         list_for_each_entry(client_node, &rbd_client_list, node)
336                 if (!ceph_compare_options(ceph_opts, client_node->client))
337                         return client_node;
338         return NULL;
339 }
340
341 /*
342  * mount options
343  */
344 enum {
345         Opt_notify_timeout,
346         Opt_last_int,
347         /* int args above */
348         Opt_last_string,
349         /* string args above */
350 };
351
352 static match_table_t rbd_opts_tokens = {
353         {Opt_notify_timeout, "notify_timeout=%d"},
354         /* int args above */
355         /* string args above */
356         {-1, NULL}
357 };
358
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361         struct rbd_options *rbd_opts = private;
362         substring_t argstr[MAX_OPT_ARGS];
363         int token, intval, ret;
364
365         token = match_token(c, rbd_opts_tokens, argstr);
366         if (token < 0)
367                 return -EINVAL;
368
369         if (token < Opt_last_int) {
370                 ret = match_int(&argstr[0], &intval);
371                 if (ret < 0) {
372                         pr_err("bad mount option arg (not int) "
373                                "at '%s'\n", c);
374                         return ret;
375                 }
376                 dout("got int token %d val %d\n", token, intval);
377         } else if (token > Opt_last_int && token < Opt_last_string) {
378                 dout("got string token %d val %s\n", token,
379                      argstr[0].from);
380         } else {
381                 dout("got token %d\n", token);
382         }
383
384         switch (token) {
385         case Opt_notify_timeout:
386                 rbd_opts->notify_timeout = intval;
387                 break;
388         default:
389                 BUG_ON(token);
390         }
391         return 0;
392 }
393
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399                                          size_t mon_addr_len,
400                                          char *options)
401 {
402         struct rbd_client *rbdc;
403         struct ceph_options *ceph_opts;
404         struct rbd_options *rbd_opts;
405
406         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407         if (!rbd_opts)
408                 return ERR_PTR(-ENOMEM);
409
410         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411
412         ceph_opts = ceph_parse_options(options, mon_addr,
413                                         mon_addr + mon_addr_len,
414                                         parse_rbd_opts_token, rbd_opts);
415         if (IS_ERR(ceph_opts)) {
416                 kfree(rbd_opts);
417                 return ERR_CAST(ceph_opts);
418         }
419
420         spin_lock(&rbd_client_list_lock);
421         rbdc = __rbd_client_find(ceph_opts);
422         if (rbdc) {
423                 /* using an existing client */
424                 kref_get(&rbdc->kref);
425                 spin_unlock(&rbd_client_list_lock);
426
427                 ceph_destroy_options(ceph_opts);
428                 kfree(rbd_opts);
429
430                 return rbdc;
431         }
432         spin_unlock(&rbd_client_list_lock);
433
434         rbdc = rbd_client_create(ceph_opts, rbd_opts);
435
436         if (IS_ERR(rbdc))
437                 kfree(rbd_opts);
438
439         return rbdc;
440 }
441
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450
451         dout("rbd_release_client %p\n", rbdc);
452         spin_lock(&rbd_client_list_lock);
453         list_del(&rbdc->node);
454         spin_unlock(&rbd_client_list_lock);
455
456         ceph_destroy_client(rbdc->client);
457         kfree(rbdc->rbd_opts);
458         kfree(rbdc);
459 }
460
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468         rbd_dev->rbd_client = NULL;
469 }
470
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476         struct rbd_req_coll *coll =
477                 container_of(kref, struct rbd_req_coll, kref);
478
479         dout("rbd_coll_release %p\n", coll);
480         kfree(coll);
481 }
482
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485         return !memcmp(&ondisk->text,
486                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487 }
488
489 /*
490  * Create a new header structure, translate header format from the on-disk
491  * header.
492  */
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494                                  struct rbd_image_header_ondisk *ondisk,
495                                  u32 allocated_snaps)
496 {
497         u32 i, snap_count;
498
499         if (!rbd_dev_ondisk_valid(ondisk))
500                 return -ENXIO;
501
502         snap_count = le32_to_cpu(ondisk->snap_count);
503         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
504                          / sizeof (*ondisk))
505                 return -EINVAL;
506         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507                                 snap_count * sizeof(u64),
508                                 GFP_KERNEL);
509         if (!header->snapc)
510                 return -ENOMEM;
511
512         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513         if (snap_count) {
514                 header->snap_names = kmalloc(header->snap_names_len,
515                                              GFP_KERNEL);
516                 if (!header->snap_names)
517                         goto err_snapc;
518                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519                                              GFP_KERNEL);
520                 if (!header->snap_sizes)
521                         goto err_names;
522         } else {
523                 header->snap_names = NULL;
524                 header->snap_sizes = NULL;
525         }
526
527         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
528                                         GFP_KERNEL);
529         if (!header->object_prefix)
530                 goto err_sizes;
531
532         memcpy(header->object_prefix, ondisk->block_name,
533                sizeof(ondisk->block_name));
534         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
535
536         header->image_size = le64_to_cpu(ondisk->image_size);
537         header->obj_order = ondisk->options.order;
538         header->crypt_type = ondisk->options.crypt_type;
539         header->comp_type = ondisk->options.comp_type;
540
541         atomic_set(&header->snapc->nref, 1);
542         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
543         header->snapc->num_snaps = snap_count;
544         header->total_snaps = snap_count;
545
546         if (snap_count && allocated_snaps == snap_count) {
547                 for (i = 0; i < snap_count; i++) {
548                         header->snapc->snaps[i] =
549                                 le64_to_cpu(ondisk->snaps[i].id);
550                         header->snap_sizes[i] =
551                                 le64_to_cpu(ondisk->snaps[i].image_size);
552                 }
553
554                 /* copy snapshot names */
555                 memcpy(header->snap_names, &ondisk->snaps[i],
556                         header->snap_names_len);
557         }
558
559         return 0;
560
561 err_sizes:
562         kfree(header->snap_sizes);
563 err_names:
564         kfree(header->snap_names);
565 err_snapc:
566         kfree(header->snapc);
567         return -ENOMEM;
568 }
569
570 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
571                         u64 *seq, u64 *size)
572 {
573         int i;
574         char *p = header->snap_names;
575
576         for (i = 0; i < header->total_snaps; i++) {
577                 if (!strcmp(snap_name, p)) {
578
579                         /* Found it.  Pass back its id and/or size */
580
581                         if (seq)
582                                 *seq = header->snapc->snaps[i];
583                         if (size)
584                                 *size = header->snap_sizes[i];
585                         return i;
586                 }
587                 p += strlen(p) + 1;     /* Skip ahead to the next name */
588         }
589         return -ENOENT;
590 }
591
592 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
593 {
594         int ret;
595
596         down_write(&rbd_dev->header_rwsem);
597
598         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
599                     sizeof (RBD_SNAP_HEAD_NAME))) {
600                 rbd_dev->snap_id = CEPH_NOSNAP;
601                 rbd_dev->snap_exists = false;
602                 rbd_dev->read_only = 0;
603                 if (size)
604                         *size = rbd_dev->header.image_size;
605         } else {
606                 u64 snap_id = 0;
607
608                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
609                                         &snap_id, size);
610                 if (ret < 0)
611                         goto done;
612                 rbd_dev->snap_id = snap_id;
613                 rbd_dev->snap_exists = true;
614                 rbd_dev->read_only = 1;
615         }
616
617         ret = 0;
618 done:
619         up_write(&rbd_dev->header_rwsem);
620         return ret;
621 }
622
623 static void rbd_header_free(struct rbd_image_header *header)
624 {
625         kfree(header->object_prefix);
626         kfree(header->snap_sizes);
627         kfree(header->snap_names);
628         ceph_put_snap_context(header->snapc);
629 }
630
631 /*
632  * get the actual striped segment name, offset and length
633  */
634 static u64 rbd_get_segment(struct rbd_image_header *header,
635                            const char *object_prefix,
636                            u64 ofs, u64 len,
637                            char *seg_name, u64 *segofs)
638 {
639         u64 seg = ofs >> header->obj_order;
640
641         if (seg_name)
642                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
643                          "%s.%012llx", object_prefix, seg);
644
645         ofs = ofs & ((1 << header->obj_order) - 1);
646         len = min_t(u64, len, (1 << header->obj_order) - ofs);
647
648         if (segofs)
649                 *segofs = ofs;
650
651         return len;
652 }
653
654 static int rbd_get_num_segments(struct rbd_image_header *header,
655                                 u64 ofs, u64 len)
656 {
657         u64 start_seg = ofs >> header->obj_order;
658         u64 end_seg = (ofs + len - 1) >> header->obj_order;
659         return end_seg - start_seg + 1;
660 }
661
662 /*
663  * returns the size of an object in the image
664  */
665 static u64 rbd_obj_bytes(struct rbd_image_header *header)
666 {
667         return 1 << header->obj_order;
668 }
669
670 /*
671  * bio helpers
672  */
673
674 static void bio_chain_put(struct bio *chain)
675 {
676         struct bio *tmp;
677
678         while (chain) {
679                 tmp = chain;
680                 chain = chain->bi_next;
681                 bio_put(tmp);
682         }
683 }
684
685 /*
686  * zeros a bio chain, starting at specific offset
687  */
688 static void zero_bio_chain(struct bio *chain, int start_ofs)
689 {
690         struct bio_vec *bv;
691         unsigned long flags;
692         void *buf;
693         int i;
694         int pos = 0;
695
696         while (chain) {
697                 bio_for_each_segment(bv, chain, i) {
698                         if (pos + bv->bv_len > start_ofs) {
699                                 int remainder = max(start_ofs - pos, 0);
700                                 buf = bvec_kmap_irq(bv, &flags);
701                                 memset(buf + remainder, 0,
702                                        bv->bv_len - remainder);
703                                 bvec_kunmap_irq(buf, &flags);
704                         }
705                         pos += bv->bv_len;
706                 }
707
708                 chain = chain->bi_next;
709         }
710 }
711
712 /*
713  * bio_chain_clone - clone a chain of bios up to a certain length.
714  * might return a bio_pair that will need to be released.
715  */
716 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
717                                    struct bio_pair **bp,
718                                    int len, gfp_t gfpmask)
719 {
720         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
721         int total = 0;
722
723         if (*bp) {
724                 bio_pair_release(*bp);
725                 *bp = NULL;
726         }
727
728         while (old_chain && (total < len)) {
729                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
730                 if (!tmp)
731                         goto err_out;
732
733                 if (total + old_chain->bi_size > len) {
734                         struct bio_pair *bp;
735
736                         /*
737                          * this split can only happen with a single paged bio,
738                          * split_bio will BUG_ON if this is not the case
739                          */
740                         dout("bio_chain_clone split! total=%d remaining=%d"
741                              "bi_size=%u\n",
742                              total, len - total, old_chain->bi_size);
743
744                         /* split the bio. We'll release it either in the next
745                            call, or it will have to be released outside */
746                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
747                         if (!bp)
748                                 goto err_out;
749
750                         __bio_clone(tmp, &bp->bio1);
751
752                         *next = &bp->bio2;
753                 } else {
754                         __bio_clone(tmp, old_chain);
755                         *next = old_chain->bi_next;
756                 }
757
758                 tmp->bi_bdev = NULL;
759                 gfpmask &= ~__GFP_WAIT;
760                 tmp->bi_next = NULL;
761
762                 if (!new_chain) {
763                         new_chain = tail = tmp;
764                 } else {
765                         tail->bi_next = tmp;
766                         tail = tmp;
767                 }
768                 old_chain = old_chain->bi_next;
769
770                 total += tmp->bi_size;
771         }
772
773         BUG_ON(total < len);
774
775         if (tail)
776                 tail->bi_next = NULL;
777
778         *old = old_chain;
779
780         return new_chain;
781
782 err_out:
783         dout("bio_chain_clone with err\n");
784         bio_chain_put(new_chain);
785         return NULL;
786 }
787
788 /*
789  * helpers for osd request op vectors.
790  */
791 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
792                                         int opcode, u32 payload_len)
793 {
794         struct ceph_osd_req_op *ops;
795
796         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
797         if (!ops)
798                 return NULL;
799
800         ops[0].op = opcode;
801
802         /*
803          * op extent offset and length will be set later on
804          * in calc_raw_layout()
805          */
806         ops[0].payload_len = payload_len;
807
808         return ops;
809 }
810
811 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
812 {
813         kfree(ops);
814 }
815
816 static void rbd_coll_end_req_index(struct request *rq,
817                                    struct rbd_req_coll *coll,
818                                    int index,
819                                    int ret, u64 len)
820 {
821         struct request_queue *q;
822         int min, max, i;
823
824         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
825              coll, index, ret, (unsigned long long) len);
826
827         if (!rq)
828                 return;
829
830         if (!coll) {
831                 blk_end_request(rq, ret, len);
832                 return;
833         }
834
835         q = rq->q;
836
837         spin_lock_irq(q->queue_lock);
838         coll->status[index].done = 1;
839         coll->status[index].rc = ret;
840         coll->status[index].bytes = len;
841         max = min = coll->num_done;
842         while (max < coll->total && coll->status[max].done)
843                 max++;
844
845         for (i = min; i<max; i++) {
846                 __blk_end_request(rq, coll->status[i].rc,
847                                   coll->status[i].bytes);
848                 coll->num_done++;
849                 kref_put(&coll->kref, rbd_coll_release);
850         }
851         spin_unlock_irq(q->queue_lock);
852 }
853
854 static void rbd_coll_end_req(struct rbd_request *req,
855                              int ret, u64 len)
856 {
857         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
858 }
859
860 /*
861  * Send ceph osd request
862  */
863 static int rbd_do_request(struct request *rq,
864                           struct rbd_device *rbd_dev,
865                           struct ceph_snap_context *snapc,
866                           u64 snapid,
867                           const char *object_name, u64 ofs, u64 len,
868                           struct bio *bio,
869                           struct page **pages,
870                           int num_pages,
871                           int flags,
872                           struct ceph_osd_req_op *ops,
873                           struct rbd_req_coll *coll,
874                           int coll_index,
875                           void (*rbd_cb)(struct ceph_osd_request *req,
876                                          struct ceph_msg *msg),
877                           struct ceph_osd_request **linger_req,
878                           u64 *ver)
879 {
880         struct ceph_osd_request *req;
881         struct ceph_file_layout *layout;
882         int ret;
883         u64 bno;
884         struct timespec mtime = CURRENT_TIME;
885         struct rbd_request *req_data;
886         struct ceph_osd_request_head *reqhead;
887         struct ceph_osd_client *osdc;
888
889         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
890         if (!req_data) {
891                 if (coll)
892                         rbd_coll_end_req_index(rq, coll, coll_index,
893                                                -ENOMEM, len);
894                 return -ENOMEM;
895         }
896
897         if (coll) {
898                 req_data->coll = coll;
899                 req_data->coll_index = coll_index;
900         }
901
902         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
903                 (unsigned long long) ofs, (unsigned long long) len);
904
905         osdc = &rbd_dev->rbd_client->client->osdc;
906         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
907                                         false, GFP_NOIO, pages, bio);
908         if (!req) {
909                 ret = -ENOMEM;
910                 goto done_pages;
911         }
912
913         req->r_callback = rbd_cb;
914
915         req_data->rq = rq;
916         req_data->bio = bio;
917         req_data->pages = pages;
918         req_data->len = len;
919
920         req->r_priv = req_data;
921
922         reqhead = req->r_request->front.iov_base;
923         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
924
925         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
926         req->r_oid_len = strlen(req->r_oid);
927
928         layout = &req->r_file_layout;
929         memset(layout, 0, sizeof(*layout));
930         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931         layout->fl_stripe_count = cpu_to_le32(1);
932         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
933         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
934         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
935                                 req, ops);
936
937         ceph_osdc_build_request(req, ofs, &len,
938                                 ops,
939                                 snapc,
940                                 &mtime,
941                                 req->r_oid, req->r_oid_len);
942
943         if (linger_req) {
944                 ceph_osdc_set_request_linger(osdc, req);
945                 *linger_req = req;
946         }
947
948         ret = ceph_osdc_start_request(osdc, req, false);
949         if (ret < 0)
950                 goto done_err;
951
952         if (!rbd_cb) {
953                 ret = ceph_osdc_wait_request(osdc, req);
954                 if (ver)
955                         *ver = le64_to_cpu(req->r_reassert_version.version);
956                 dout("reassert_ver=%llu\n",
957                         (unsigned long long)
958                                 le64_to_cpu(req->r_reassert_version.version));
959                 ceph_osdc_put_request(req);
960         }
961         return ret;
962
963 done_err:
964         bio_chain_put(req_data->bio);
965         ceph_osdc_put_request(req);
966 done_pages:
967         rbd_coll_end_req(req_data, ret, len);
968         kfree(req_data);
969         return ret;
970 }
971
972 /*
973  * Ceph osd op callback
974  */
975 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
976 {
977         struct rbd_request *req_data = req->r_priv;
978         struct ceph_osd_reply_head *replyhead;
979         struct ceph_osd_op *op;
980         __s32 rc;
981         u64 bytes;
982         int read_op;
983
984         /* parse reply */
985         replyhead = msg->front.iov_base;
986         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
987         op = (void *)(replyhead + 1);
988         rc = le32_to_cpu(replyhead->result);
989         bytes = le64_to_cpu(op->extent.length);
990         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
991
992         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
993                 (unsigned long long) bytes, read_op, (int) rc);
994
995         if (rc == -ENOENT && read_op) {
996                 zero_bio_chain(req_data->bio, 0);
997                 rc = 0;
998         } else if (rc == 0 && read_op && bytes < req_data->len) {
999                 zero_bio_chain(req_data->bio, bytes);
1000                 bytes = req_data->len;
1001         }
1002
1003         rbd_coll_end_req(req_data, rc, bytes);
1004
1005         if (req_data->bio)
1006                 bio_chain_put(req_data->bio);
1007
1008         ceph_osdc_put_request(req);
1009         kfree(req_data);
1010 }
1011
1012 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1013 {
1014         ceph_osdc_put_request(req);
1015 }
1016
1017 /*
1018  * Do a synchronous ceph osd operation
1019  */
1020 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1021                            struct ceph_snap_context *snapc,
1022                            u64 snapid,
1023                            int opcode,
1024                            int flags,
1025                            struct ceph_osd_req_op *orig_ops,
1026                            const char *object_name,
1027                            u64 ofs, u64 len,
1028                            char *buf,
1029                            struct ceph_osd_request **linger_req,
1030                            u64 *ver)
1031 {
1032         int ret;
1033         struct page **pages;
1034         int num_pages;
1035         struct ceph_osd_req_op *ops = orig_ops;
1036         u32 payload_len;
1037
1038         num_pages = calc_pages_for(ofs , len);
1039         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1040         if (IS_ERR(pages))
1041                 return PTR_ERR(pages);
1042
1043         if (!orig_ops) {
1044                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1045                 ret = -ENOMEM;
1046                 ops = rbd_create_rw_ops(1, opcode, payload_len);
1047                 if (!ops)
1048                         goto done;
1049
1050                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1051                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1052                         if (ret < 0)
1053                                 goto done_ops;
1054                 }
1055         }
1056
1057         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1058                           object_name, ofs, len, NULL,
1059                           pages, num_pages,
1060                           flags,
1061                           ops,
1062                           NULL, 0,
1063                           NULL,
1064                           linger_req, ver);
1065         if (ret < 0)
1066                 goto done_ops;
1067
1068         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1069                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1070
1071 done_ops:
1072         if (!orig_ops)
1073                 rbd_destroy_ops(ops);
1074 done:
1075         ceph_release_page_vector(pages, num_pages);
1076         return ret;
1077 }
1078
1079 /*
1080  * Do an asynchronous ceph osd operation
1081  */
1082 static int rbd_do_op(struct request *rq,
1083                      struct rbd_device *rbd_dev,
1084                      struct ceph_snap_context *snapc,
1085                      u64 snapid,
1086                      int opcode, int flags,
1087                      u64 ofs, u64 len,
1088                      struct bio *bio,
1089                      struct rbd_req_coll *coll,
1090                      int coll_index)
1091 {
1092         char *seg_name;
1093         u64 seg_ofs;
1094         u64 seg_len;
1095         int ret;
1096         struct ceph_osd_req_op *ops;
1097         u32 payload_len;
1098
1099         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1100         if (!seg_name)
1101                 return -ENOMEM;
1102
1103         seg_len = rbd_get_segment(&rbd_dev->header,
1104                                   rbd_dev->header.object_prefix,
1105                                   ofs, len,
1106                                   seg_name, &seg_ofs);
1107
1108         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1109
1110         ret = -ENOMEM;
1111         ops = rbd_create_rw_ops(1, opcode, payload_len);
1112         if (!ops)
1113                 goto done;
1114
1115         /* we've taken care of segment sizes earlier when we
1116            cloned the bios. We should never have a segment
1117            truncated at this point */
1118         BUG_ON(seg_len < len);
1119
1120         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1121                              seg_name, seg_ofs, seg_len,
1122                              bio,
1123                              NULL, 0,
1124                              flags,
1125                              ops,
1126                              coll, coll_index,
1127                              rbd_req_cb, 0, NULL);
1128
1129         rbd_destroy_ops(ops);
1130 done:
1131         kfree(seg_name);
1132         return ret;
1133 }
1134
1135 /*
1136  * Request async osd write
1137  */
1138 static int rbd_req_write(struct request *rq,
1139                          struct rbd_device *rbd_dev,
1140                          struct ceph_snap_context *snapc,
1141                          u64 ofs, u64 len,
1142                          struct bio *bio,
1143                          struct rbd_req_coll *coll,
1144                          int coll_index)
1145 {
1146         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1147                          CEPH_OSD_OP_WRITE,
1148                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1149                          ofs, len, bio, coll, coll_index);
1150 }
1151
1152 /*
1153  * Request async osd read
1154  */
1155 static int rbd_req_read(struct request *rq,
1156                          struct rbd_device *rbd_dev,
1157                          u64 snapid,
1158                          u64 ofs, u64 len,
1159                          struct bio *bio,
1160                          struct rbd_req_coll *coll,
1161                          int coll_index)
1162 {
1163         return rbd_do_op(rq, rbd_dev, NULL,
1164                          snapid,
1165                          CEPH_OSD_OP_READ,
1166                          CEPH_OSD_FLAG_READ,
1167                          ofs, len, bio, coll, coll_index);
1168 }
1169
1170 /*
1171  * Request sync osd read
1172  */
1173 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1174                           u64 snapid,
1175                           const char *object_name,
1176                           u64 ofs, u64 len,
1177                           char *buf,
1178                           u64 *ver)
1179 {
1180         return rbd_req_sync_op(rbd_dev, NULL,
1181                                snapid,
1182                                CEPH_OSD_OP_READ,
1183                                CEPH_OSD_FLAG_READ,
1184                                NULL,
1185                                object_name, ofs, len, buf, NULL, ver);
1186 }
1187
1188 /*
1189  * Request sync osd watch
1190  */
1191 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1192                                    u64 ver,
1193                                    u64 notify_id)
1194 {
1195         struct ceph_osd_req_op *ops;
1196         int ret;
1197
1198         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1199         if (!ops)
1200                 return -ENOMEM;
1201
1202         ops[0].watch.ver = cpu_to_le64(ver);
1203         ops[0].watch.cookie = notify_id;
1204         ops[0].watch.flag = 0;
1205
1206         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1207                           rbd_dev->header_name, 0, 0, NULL,
1208                           NULL, 0,
1209                           CEPH_OSD_FLAG_READ,
1210                           ops,
1211                           NULL, 0,
1212                           rbd_simple_req_cb, 0, NULL);
1213
1214         rbd_destroy_ops(ops);
1215         return ret;
1216 }
1217
1218 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1219 {
1220         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1221         u64 hver;
1222         int rc;
1223
1224         if (!rbd_dev)
1225                 return;
1226
1227         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1228                 rbd_dev->header_name, (unsigned long long) notify_id,
1229                 (unsigned int) opcode);
1230         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1231         rc = __rbd_refresh_header(rbd_dev);
1232         hver = rbd_dev->header.obj_version;
1233         mutex_unlock(&ctl_mutex);
1234         if (rc)
1235                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1236                            " update snaps: %d\n", rbd_dev->major, rc);
1237
1238         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1239 }
1240
1241 /*
1242  * Request sync osd watch
1243  */
1244 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1245 {
1246         struct ceph_osd_req_op *ops;
1247         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1248         int ret;
1249
1250         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1251         if (!ops)
1252                 return -ENOMEM;
1253
1254         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1255                                      (void *)rbd_dev, &rbd_dev->watch_event);
1256         if (ret < 0)
1257                 goto fail;
1258
1259         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1260         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1261         ops[0].watch.flag = 1;
1262
1263         ret = rbd_req_sync_op(rbd_dev, NULL,
1264                               CEPH_NOSNAP,
1265                               0,
1266                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1267                               ops,
1268                               rbd_dev->header_name,
1269                               0, 0, NULL,
1270                               &rbd_dev->watch_request, NULL);
1271
1272         if (ret < 0)
1273                 goto fail_event;
1274
1275         rbd_destroy_ops(ops);
1276         return 0;
1277
1278 fail_event:
1279         ceph_osdc_cancel_event(rbd_dev->watch_event);
1280         rbd_dev->watch_event = NULL;
1281 fail:
1282         rbd_destroy_ops(ops);
1283         return ret;
1284 }
1285
1286 /*
1287  * Request sync osd unwatch
1288  */
1289 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1290 {
1291         struct ceph_osd_req_op *ops;
1292         int ret;
1293
1294         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1295         if (!ops)
1296                 return -ENOMEM;
1297
1298         ops[0].watch.ver = 0;
1299         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1300         ops[0].watch.flag = 0;
1301
1302         ret = rbd_req_sync_op(rbd_dev, NULL,
1303                               CEPH_NOSNAP,
1304                               0,
1305                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1306                               ops,
1307                               rbd_dev->header_name,
1308                               0, 0, NULL, NULL, NULL);
1309
1310
1311         rbd_destroy_ops(ops);
1312         ceph_osdc_cancel_event(rbd_dev->watch_event);
1313         rbd_dev->watch_event = NULL;
1314         return ret;
1315 }
1316
1317 struct rbd_notify_info {
1318         struct rbd_device *rbd_dev;
1319 };
1320
1321 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1322 {
1323         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1324         if (!rbd_dev)
1325                 return;
1326
1327         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1328                         rbd_dev->header_name, (unsigned long long) notify_id,
1329                         (unsigned int) opcode);
1330 }
1331
1332 /*
1333  * Request sync osd notify
1334  */
1335 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1336 {
1337         struct ceph_osd_req_op *ops;
1338         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1339         struct ceph_osd_event *event;
1340         struct rbd_notify_info info;
1341         int payload_len = sizeof(u32) + sizeof(u32);
1342         int ret;
1343
1344         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1345         if (!ops)
1346                 return -ENOMEM;
1347
1348         info.rbd_dev = rbd_dev;
1349
1350         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1351                                      (void *)&info, &event);
1352         if (ret < 0)
1353                 goto fail;
1354
1355         ops[0].watch.ver = 1;
1356         ops[0].watch.flag = 1;
1357         ops[0].watch.cookie = event->cookie;
1358         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1359         ops[0].watch.timeout = 12;
1360
1361         ret = rbd_req_sync_op(rbd_dev, NULL,
1362                                CEPH_NOSNAP,
1363                                0,
1364                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1365                                ops,
1366                                rbd_dev->header_name,
1367                                0, 0, NULL, NULL, NULL);
1368         if (ret < 0)
1369                 goto fail_event;
1370
1371         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1372         dout("ceph_osdc_wait_event returned %d\n", ret);
1373         rbd_destroy_ops(ops);
1374         return 0;
1375
1376 fail_event:
1377         ceph_osdc_cancel_event(event);
1378 fail:
1379         rbd_destroy_ops(ops);
1380         return ret;
1381 }
1382
1383 /*
1384  * Request sync osd read
1385  */
1386 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1387                              const char *object_name,
1388                              const char *class_name,
1389                              const char *method_name,
1390                              const char *data,
1391                              int len,
1392                              u64 *ver)
1393 {
1394         struct ceph_osd_req_op *ops;
1395         int class_name_len = strlen(class_name);
1396         int method_name_len = strlen(method_name);
1397         int ret;
1398
1399         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1400                                     class_name_len + method_name_len + len);
1401         if (!ops)
1402                 return -ENOMEM;
1403
1404         ops[0].cls.class_name = class_name;
1405         ops[0].cls.class_len = (__u8) class_name_len;
1406         ops[0].cls.method_name = method_name;
1407         ops[0].cls.method_len = (__u8) method_name_len;
1408         ops[0].cls.argc = 0;
1409         ops[0].cls.indata = data;
1410         ops[0].cls.indata_len = len;
1411
1412         ret = rbd_req_sync_op(rbd_dev, NULL,
1413                                CEPH_NOSNAP,
1414                                0,
1415                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1416                                ops,
1417                                object_name, 0, 0, NULL, NULL, ver);
1418
1419         rbd_destroy_ops(ops);
1420
1421         dout("cls_exec returned %d\n", ret);
1422         return ret;
1423 }
1424
1425 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1426 {
1427         struct rbd_req_coll *coll =
1428                         kzalloc(sizeof(struct rbd_req_coll) +
1429                                 sizeof(struct rbd_req_status) * num_reqs,
1430                                 GFP_ATOMIC);
1431
1432         if (!coll)
1433                 return NULL;
1434         coll->total = num_reqs;
1435         kref_init(&coll->kref);
1436         return coll;
1437 }
1438
1439 /*
1440  * block device queue callback
1441  */
1442 static void rbd_rq_fn(struct request_queue *q)
1443 {
1444         struct rbd_device *rbd_dev = q->queuedata;
1445         struct request *rq;
1446         struct bio_pair *bp = NULL;
1447
1448         while ((rq = blk_fetch_request(q))) {
1449                 struct bio *bio;
1450                 struct bio *rq_bio, *next_bio = NULL;
1451                 bool do_write;
1452                 unsigned int size;
1453                 u64 op_size = 0;
1454                 u64 ofs;
1455                 int num_segs, cur_seg = 0;
1456                 struct rbd_req_coll *coll;
1457                 struct ceph_snap_context *snapc;
1458
1459                 /* peek at request from block layer */
1460                 if (!rq)
1461                         break;
1462
1463                 dout("fetched request\n");
1464
1465                 /* filter out block requests we don't understand */
1466                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1467                         __blk_end_request_all(rq, 0);
1468                         continue;
1469                 }
1470
1471                 /* deduce our operation (read, write) */
1472                 do_write = (rq_data_dir(rq) == WRITE);
1473
1474                 size = blk_rq_bytes(rq);
1475                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1476                 rq_bio = rq->bio;
1477                 if (do_write && rbd_dev->read_only) {
1478                         __blk_end_request_all(rq, -EROFS);
1479                         continue;
1480                 }
1481
1482                 spin_unlock_irq(q->queue_lock);
1483
1484                 down_read(&rbd_dev->header_rwsem);
1485
1486                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1487                         up_read(&rbd_dev->header_rwsem);
1488                         dout("request for non-existent snapshot");
1489                         spin_lock_irq(q->queue_lock);
1490                         __blk_end_request_all(rq, -ENXIO);
1491                         continue;
1492                 }
1493
1494                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1495
1496                 up_read(&rbd_dev->header_rwsem);
1497
1498                 dout("%s 0x%x bytes at 0x%llx\n",
1499                      do_write ? "write" : "read",
1500                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1501
1502                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1503                 coll = rbd_alloc_coll(num_segs);
1504                 if (!coll) {
1505                         spin_lock_irq(q->queue_lock);
1506                         __blk_end_request_all(rq, -ENOMEM);
1507                         ceph_put_snap_context(snapc);
1508                         continue;
1509                 }
1510
1511                 do {
1512                         /* a bio clone to be passed down to OSD req */
1513                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1514                         op_size = rbd_get_segment(&rbd_dev->header,
1515                                                   rbd_dev->header.object_prefix,
1516                                                   ofs, size,
1517                                                   NULL, NULL);
1518                         kref_get(&coll->kref);
1519                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1520                                               op_size, GFP_ATOMIC);
1521                         if (!bio) {
1522                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1523                                                        -ENOMEM, op_size);
1524                                 goto next_seg;
1525                         }
1526
1527
1528                         /* init OSD command: write or read */
1529                         if (do_write)
1530                                 rbd_req_write(rq, rbd_dev,
1531                                               snapc,
1532                                               ofs,
1533                                               op_size, bio,
1534                                               coll, cur_seg);
1535                         else
1536                                 rbd_req_read(rq, rbd_dev,
1537                                              rbd_dev->snap_id,
1538                                              ofs,
1539                                              op_size, bio,
1540                                              coll, cur_seg);
1541
1542 next_seg:
1543                         size -= op_size;
1544                         ofs += op_size;
1545
1546                         cur_seg++;
1547                         rq_bio = next_bio;
1548                 } while (size > 0);
1549                 kref_put(&coll->kref, rbd_coll_release);
1550
1551                 if (bp)
1552                         bio_pair_release(bp);
1553                 spin_lock_irq(q->queue_lock);
1554
1555                 ceph_put_snap_context(snapc);
1556         }
1557 }
1558
1559 /*
1560  * a queue callback. Makes sure that we don't create a bio that spans across
1561  * multiple osd objects. One exception would be with a single page bios,
1562  * which we handle later at bio_chain_clone
1563  */
1564 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1565                           struct bio_vec *bvec)
1566 {
1567         struct rbd_device *rbd_dev = q->queuedata;
1568         unsigned int chunk_sectors;
1569         sector_t sector;
1570         unsigned int bio_sectors;
1571         int max;
1572
1573         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1574         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1575         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1576
1577         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1578                                  + bio_sectors)) << SECTOR_SHIFT;
1579         if (max < 0)
1580                 max = 0; /* bio_add cannot handle a negative return */
1581         if (max <= bvec->bv_len && bio_sectors == 0)
1582                 return bvec->bv_len;
1583         return max;
1584 }
1585
1586 static void rbd_free_disk(struct rbd_device *rbd_dev)
1587 {
1588         struct gendisk *disk = rbd_dev->disk;
1589
1590         if (!disk)
1591                 return;
1592
1593         rbd_header_free(&rbd_dev->header);
1594
1595         if (disk->flags & GENHD_FL_UP)
1596                 del_gendisk(disk);
1597         if (disk->queue)
1598                 blk_cleanup_queue(disk->queue);
1599         put_disk(disk);
1600 }
1601
1602 /*
1603  * reload the ondisk the header 
1604  */
1605 static int rbd_read_header(struct rbd_device *rbd_dev,
1606                            struct rbd_image_header *header)
1607 {
1608         ssize_t rc;
1609         struct rbd_image_header_ondisk *dh;
1610         u32 snap_count = 0;
1611         u64 ver;
1612         size_t len;
1613
1614         /*
1615          * First reads the fixed-size header to determine the number
1616          * of snapshots, then re-reads it, along with all snapshot
1617          * records as well as their stored names.
1618          */
1619         len = sizeof (*dh);
1620         while (1) {
1621                 dh = kmalloc(len, GFP_KERNEL);
1622                 if (!dh)
1623                         return -ENOMEM;
1624
1625                 rc = rbd_req_sync_read(rbd_dev,
1626                                        CEPH_NOSNAP,
1627                                        rbd_dev->header_name,
1628                                        0, len,
1629                                        (char *)dh, &ver);
1630                 if (rc < 0)
1631                         goto out_dh;
1632
1633                 rc = rbd_header_from_disk(header, dh, snap_count);
1634                 if (rc < 0) {
1635                         if (rc == -ENXIO)
1636                                 pr_warning("unrecognized header format"
1637                                            " for image %s\n",
1638                                            rbd_dev->image_name);
1639                         goto out_dh;
1640                 }
1641
1642                 if (snap_count == header->total_snaps)
1643                         break;
1644
1645                 snap_count = header->total_snaps;
1646                 len = sizeof (*dh) +
1647                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1648                         header->snap_names_len;
1649
1650                 rbd_header_free(header);
1651                 kfree(dh);
1652         }
1653         header->obj_version = ver;
1654
1655 out_dh:
1656         kfree(dh);
1657         return rc;
1658 }
1659
1660 /*
1661  * create a snapshot
1662  */
1663 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1664                                const char *snap_name,
1665                                gfp_t gfp_flags)
1666 {
1667         int name_len = strlen(snap_name);
1668         u64 new_snapid;
1669         int ret;
1670         void *data, *p, *e;
1671         struct ceph_mon_client *monc;
1672
1673         /* we should create a snapshot only if we're pointing at the head */
1674         if (rbd_dev->snap_id != CEPH_NOSNAP)
1675                 return -EINVAL;
1676
1677         monc = &rbd_dev->rbd_client->client->monc;
1678         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1679         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1680         if (ret < 0)
1681                 return ret;
1682
1683         data = kmalloc(name_len + 16, gfp_flags);
1684         if (!data)
1685                 return -ENOMEM;
1686
1687         p = data;
1688         e = data + name_len + 16;
1689
1690         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1691         ceph_encode_64_safe(&p, e, new_snapid, bad);
1692
1693         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1694                                 "rbd", "snap_add",
1695                                 data, p - data, NULL);
1696
1697         kfree(data);
1698
1699         return ret < 0 ? ret : 0;
1700 bad:
1701         return -ERANGE;
1702 }
1703
1704 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1705 {
1706         struct rbd_snap *snap;
1707         struct rbd_snap *next;
1708
1709         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1710                 __rbd_remove_snap_dev(snap);
1711 }
1712
1713 /*
1714  * only read the first part of the ondisk header, without the snaps info
1715  */
1716 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1717 {
1718         int ret;
1719         struct rbd_image_header h;
1720
1721         ret = rbd_read_header(rbd_dev, &h);
1722         if (ret < 0)
1723                 return ret;
1724
1725         down_write(&rbd_dev->header_rwsem);
1726
1727         /* resized? */
1728         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1729                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1730
1731                 dout("setting size to %llu sectors", (unsigned long long) size);
1732                 set_capacity(rbd_dev->disk, size);
1733         }
1734
1735         /* rbd_dev->header.object_prefix shouldn't change */
1736         kfree(rbd_dev->header.snap_sizes);
1737         kfree(rbd_dev->header.snap_names);
1738         /* osd requests may still refer to snapc */
1739         ceph_put_snap_context(rbd_dev->header.snapc);
1740
1741         rbd_dev->header.obj_version = h.obj_version;
1742         rbd_dev->header.image_size = h.image_size;
1743         rbd_dev->header.total_snaps = h.total_snaps;
1744         rbd_dev->header.snapc = h.snapc;
1745         rbd_dev->header.snap_names = h.snap_names;
1746         rbd_dev->header.snap_names_len = h.snap_names_len;
1747         rbd_dev->header.snap_sizes = h.snap_sizes;
1748         /* Free the extra copy of the object prefix */
1749         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1750         kfree(h.object_prefix);
1751
1752         ret = __rbd_init_snaps_header(rbd_dev);
1753
1754         up_write(&rbd_dev->header_rwsem);
1755
1756         return ret;
1757 }
1758
1759 static int rbd_init_disk(struct rbd_device *rbd_dev)
1760 {
1761         struct gendisk *disk;
1762         struct request_queue *q;
1763         int rc;
1764         u64 segment_size;
1765         u64 total_size = 0;
1766
1767         /* contact OSD, request size info about the object being mapped */
1768         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1769         if (rc)
1770                 return rc;
1771
1772         /* no need to lock here, as rbd_dev is not registered yet */
1773         rc = __rbd_init_snaps_header(rbd_dev);
1774         if (rc)
1775                 return rc;
1776
1777         rc = rbd_header_set_snap(rbd_dev, &total_size);
1778         if (rc)
1779                 return rc;
1780
1781         /* create gendisk info */
1782         rc = -ENOMEM;
1783         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1784         if (!disk)
1785                 goto out;
1786
1787         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1788                  rbd_dev->dev_id);
1789         disk->major = rbd_dev->major;
1790         disk->first_minor = 0;
1791         disk->fops = &rbd_bd_ops;
1792         disk->private_data = rbd_dev;
1793
1794         /* init rq */
1795         rc = -ENOMEM;
1796         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1797         if (!q)
1798                 goto out_disk;
1799
1800         /* We use the default size, but let's be explicit about it. */
1801         blk_queue_physical_block_size(q, SECTOR_SIZE);
1802
1803         /* set io sizes to object size */
1804         segment_size = rbd_obj_bytes(&rbd_dev->header);
1805         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1806         blk_queue_max_segment_size(q, segment_size);
1807         blk_queue_io_min(q, segment_size);
1808         blk_queue_io_opt(q, segment_size);
1809
1810         blk_queue_merge_bvec(q, rbd_merge_bvec);
1811         disk->queue = q;
1812
1813         q->queuedata = rbd_dev;
1814
1815         rbd_dev->disk = disk;
1816         rbd_dev->q = q;
1817
1818         /* finally, announce the disk to the world */
1819         set_capacity(disk, total_size / SECTOR_SIZE);
1820         add_disk(disk);
1821
1822         pr_info("%s: added with size 0x%llx\n",
1823                 disk->disk_name, (unsigned long long)total_size);
1824         return 0;
1825
1826 out_disk:
1827         put_disk(disk);
1828 out:
1829         return rc;
1830 }
1831
1832 /*
1833   sysfs
1834 */
1835
1836 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1837 {
1838         return container_of(dev, struct rbd_device, dev);
1839 }
1840
1841 static ssize_t rbd_size_show(struct device *dev,
1842                              struct device_attribute *attr, char *buf)
1843 {
1844         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1845         sector_t size;
1846
1847         down_read(&rbd_dev->header_rwsem);
1848         size = get_capacity(rbd_dev->disk);
1849         up_read(&rbd_dev->header_rwsem);
1850
1851         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1852 }
1853
1854 static ssize_t rbd_major_show(struct device *dev,
1855                               struct device_attribute *attr, char *buf)
1856 {
1857         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1858
1859         return sprintf(buf, "%d\n", rbd_dev->major);
1860 }
1861
1862 static ssize_t rbd_client_id_show(struct device *dev,
1863                                   struct device_attribute *attr, char *buf)
1864 {
1865         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1866
1867         return sprintf(buf, "client%lld\n",
1868                         ceph_client_id(rbd_dev->rbd_client->client));
1869 }
1870
1871 static ssize_t rbd_pool_show(struct device *dev,
1872                              struct device_attribute *attr, char *buf)
1873 {
1874         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1875
1876         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1877 }
1878
1879 static ssize_t rbd_pool_id_show(struct device *dev,
1880                              struct device_attribute *attr, char *buf)
1881 {
1882         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1883
1884         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1885 }
1886
1887 static ssize_t rbd_name_show(struct device *dev,
1888                              struct device_attribute *attr, char *buf)
1889 {
1890         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1891
1892         return sprintf(buf, "%s\n", rbd_dev->image_name);
1893 }
1894
1895 static ssize_t rbd_snap_show(struct device *dev,
1896                              struct device_attribute *attr,
1897                              char *buf)
1898 {
1899         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1900
1901         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1902 }
1903
1904 static ssize_t rbd_image_refresh(struct device *dev,
1905                                  struct device_attribute *attr,
1906                                  const char *buf,
1907                                  size_t size)
1908 {
1909         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1910         int rc;
1911         int ret = size;
1912
1913         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1914
1915         rc = __rbd_refresh_header(rbd_dev);
1916         if (rc < 0)
1917                 ret = rc;
1918
1919         mutex_unlock(&ctl_mutex);
1920         return ret;
1921 }
1922
1923 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1924 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1925 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1926 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1927 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1928 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1929 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1930 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1931 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1932
1933 static struct attribute *rbd_attrs[] = {
1934         &dev_attr_size.attr,
1935         &dev_attr_major.attr,
1936         &dev_attr_client_id.attr,
1937         &dev_attr_pool.attr,
1938         &dev_attr_pool_id.attr,
1939         &dev_attr_name.attr,
1940         &dev_attr_current_snap.attr,
1941         &dev_attr_refresh.attr,
1942         &dev_attr_create_snap.attr,
1943         NULL
1944 };
1945
1946 static struct attribute_group rbd_attr_group = {
1947         .attrs = rbd_attrs,
1948 };
1949
1950 static const struct attribute_group *rbd_attr_groups[] = {
1951         &rbd_attr_group,
1952         NULL
1953 };
1954
1955 static void rbd_sysfs_dev_release(struct device *dev)
1956 {
1957 }
1958
1959 static struct device_type rbd_device_type = {
1960         .name           = "rbd",
1961         .groups         = rbd_attr_groups,
1962         .release        = rbd_sysfs_dev_release,
1963 };
1964
1965
1966 /*
1967   sysfs - snapshots
1968 */
1969
1970 static ssize_t rbd_snap_size_show(struct device *dev,
1971                                   struct device_attribute *attr,
1972                                   char *buf)
1973 {
1974         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1975
1976         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1977 }
1978
1979 static ssize_t rbd_snap_id_show(struct device *dev,
1980                                 struct device_attribute *attr,
1981                                 char *buf)
1982 {
1983         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1984
1985         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1986 }
1987
1988 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1989 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1990
1991 static struct attribute *rbd_snap_attrs[] = {
1992         &dev_attr_snap_size.attr,
1993         &dev_attr_snap_id.attr,
1994         NULL,
1995 };
1996
1997 static struct attribute_group rbd_snap_attr_group = {
1998         .attrs = rbd_snap_attrs,
1999 };
2000
2001 static void rbd_snap_dev_release(struct device *dev)
2002 {
2003         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2004         kfree(snap->name);
2005         kfree(snap);
2006 }
2007
2008 static const struct attribute_group *rbd_snap_attr_groups[] = {
2009         &rbd_snap_attr_group,
2010         NULL
2011 };
2012
2013 static struct device_type rbd_snap_device_type = {
2014         .groups         = rbd_snap_attr_groups,
2015         .release        = rbd_snap_dev_release,
2016 };
2017
2018 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2019 {
2020         list_del(&snap->node);
2021         device_unregister(&snap->dev);
2022 }
2023
2024 static int rbd_register_snap_dev(struct rbd_snap *snap,
2025                                   struct device *parent)
2026 {
2027         struct device *dev = &snap->dev;
2028         int ret;
2029
2030         dev->type = &rbd_snap_device_type;
2031         dev->parent = parent;
2032         dev->release = rbd_snap_dev_release;
2033         dev_set_name(dev, "snap_%s", snap->name);
2034         ret = device_register(dev);
2035
2036         return ret;
2037 }
2038
2039 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2040                                               int i, const char *name)
2041 {
2042         struct rbd_snap *snap;
2043         int ret;
2044
2045         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2046         if (!snap)
2047                 return ERR_PTR(-ENOMEM);
2048
2049         ret = -ENOMEM;
2050         snap->name = kstrdup(name, GFP_KERNEL);
2051         if (!snap->name)
2052                 goto err;
2053
2054         snap->size = rbd_dev->header.snap_sizes[i];
2055         snap->id = rbd_dev->header.snapc->snaps[i];
2056         if (device_is_registered(&rbd_dev->dev)) {
2057                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2058                 if (ret < 0)
2059                         goto err;
2060         }
2061
2062         return snap;
2063
2064 err:
2065         kfree(snap->name);
2066         kfree(snap);
2067
2068         return ERR_PTR(ret);
2069 }
2070
2071 /*
2072  * search for the previous snap in a null delimited string list
2073  */
2074 const char *rbd_prev_snap_name(const char *name, const char *start)
2075 {
2076         if (name < start + 2)
2077                 return NULL;
2078
2079         name -= 2;
2080         while (*name) {
2081                 if (name == start)
2082                         return start;
2083                 name--;
2084         }
2085         return name + 1;
2086 }
2087
2088 /*
2089  * compare the old list of snapshots that we have to what's in the header
2090  * and update it accordingly. Note that the header holds the snapshots
2091  * in a reverse order (from newest to oldest) and we need to go from
2092  * older to new so that we don't get a duplicate snap name when
2093  * doing the process (e.g., removed snapshot and recreated a new
2094  * one with the same name.
2095  */
2096 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2097 {
2098         const char *name, *first_name;
2099         int i = rbd_dev->header.total_snaps;
2100         struct rbd_snap *snap, *old_snap = NULL;
2101         struct list_head *p, *n;
2102
2103         first_name = rbd_dev->header.snap_names;
2104         name = first_name + rbd_dev->header.snap_names_len;
2105
2106         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2107                 u64 cur_id;
2108
2109                 old_snap = list_entry(p, struct rbd_snap, node);
2110
2111                 if (i)
2112                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2113
2114                 if (!i || old_snap->id < cur_id) {
2115                         /*
2116                          * old_snap->id was skipped, thus was
2117                          * removed.  If this rbd_dev is mapped to
2118                          * the removed snapshot, record that it no
2119                          * longer exists, to prevent further I/O.
2120                          */
2121                         if (rbd_dev->snap_id == old_snap->id)
2122                                 rbd_dev->snap_exists = false;
2123                         __rbd_remove_snap_dev(old_snap);
2124                         continue;
2125                 }
2126                 if (old_snap->id == cur_id) {
2127                         /* we have this snapshot already */
2128                         i--;
2129                         name = rbd_prev_snap_name(name, first_name);
2130                         continue;
2131                 }
2132                 for (; i > 0;
2133                      i--, name = rbd_prev_snap_name(name, first_name)) {
2134                         if (!name) {
2135                                 WARN_ON(1);
2136                                 return -EINVAL;
2137                         }
2138                         cur_id = rbd_dev->header.snapc->snaps[i];
2139                         /* snapshot removal? handle it above */
2140                         if (cur_id >= old_snap->id)
2141                                 break;
2142                         /* a new snapshot */
2143                         snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2144                         if (IS_ERR(snap))
2145                                 return PTR_ERR(snap);
2146
2147                         /* note that we add it backward so using n and not p */
2148                         list_add(&snap->node, n);
2149                         p = &snap->node;
2150                 }
2151         }
2152         /* we're done going over the old snap list, just add what's left */
2153         for (; i > 0; i--) {
2154                 name = rbd_prev_snap_name(name, first_name);
2155                 if (!name) {
2156                         WARN_ON(1);
2157                         return -EINVAL;
2158                 }
2159                 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2160                 if (IS_ERR(snap))
2161                         return PTR_ERR(snap);
2162                 list_add(&snap->node, &rbd_dev->snaps);
2163         }
2164
2165         return 0;
2166 }
2167
2168 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2169 {
2170         int ret;
2171         struct device *dev;
2172         struct rbd_snap *snap;
2173
2174         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2175         dev = &rbd_dev->dev;
2176
2177         dev->bus = &rbd_bus_type;
2178         dev->type = &rbd_device_type;
2179         dev->parent = &rbd_root_dev;
2180         dev->release = rbd_dev_release;
2181         dev_set_name(dev, "%d", rbd_dev->dev_id);
2182         ret = device_register(dev);
2183         if (ret < 0)
2184                 goto out;
2185
2186         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2187                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2188                 if (ret < 0)
2189                         break;
2190         }
2191 out:
2192         mutex_unlock(&ctl_mutex);
2193         return ret;
2194 }
2195
2196 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2197 {
2198         device_unregister(&rbd_dev->dev);
2199 }
2200
2201 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2202 {
2203         int ret, rc;
2204
2205         do {
2206                 ret = rbd_req_sync_watch(rbd_dev);
2207                 if (ret == -ERANGE) {
2208                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2209                         rc = __rbd_refresh_header(rbd_dev);
2210                         mutex_unlock(&ctl_mutex);
2211                         if (rc < 0)
2212                                 return rc;
2213                 }
2214         } while (ret == -ERANGE);
2215
2216         return ret;
2217 }
2218
2219 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2220
2221 /*
2222  * Get a unique rbd identifier for the given new rbd_dev, and add
2223  * the rbd_dev to the global list.  The minimum rbd id is 1.
2224  */
2225 static void rbd_id_get(struct rbd_device *rbd_dev)
2226 {
2227         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2228
2229         spin_lock(&rbd_dev_list_lock);
2230         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2231         spin_unlock(&rbd_dev_list_lock);
2232 }
2233
2234 /*
2235  * Remove an rbd_dev from the global list, and record that its
2236  * identifier is no longer in use.
2237  */
2238 static void rbd_id_put(struct rbd_device *rbd_dev)
2239 {
2240         struct list_head *tmp;
2241         int rbd_id = rbd_dev->dev_id;
2242         int max_id;
2243
2244         BUG_ON(rbd_id < 1);
2245
2246         spin_lock(&rbd_dev_list_lock);
2247         list_del_init(&rbd_dev->node);
2248
2249         /*
2250          * If the id being "put" is not the current maximum, there
2251          * is nothing special we need to do.
2252          */
2253         if (rbd_id != atomic64_read(&rbd_id_max)) {
2254                 spin_unlock(&rbd_dev_list_lock);
2255                 return;
2256         }
2257
2258         /*
2259          * We need to update the current maximum id.  Search the
2260          * list to find out what it is.  We're more likely to find
2261          * the maximum at the end, so search the list backward.
2262          */
2263         max_id = 0;
2264         list_for_each_prev(tmp, &rbd_dev_list) {
2265                 struct rbd_device *rbd_dev;
2266
2267                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2268                 if (rbd_id > max_id)
2269                         max_id = rbd_id;
2270         }
2271         spin_unlock(&rbd_dev_list_lock);
2272
2273         /*
2274          * The max id could have been updated by rbd_id_get(), in
2275          * which case it now accurately reflects the new maximum.
2276          * Be careful not to overwrite the maximum value in that
2277          * case.
2278          */
2279         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2280 }
2281
2282 /*
2283  * Skips over white space at *buf, and updates *buf to point to the
2284  * first found non-space character (if any). Returns the length of
2285  * the token (string of non-white space characters) found.  Note
2286  * that *buf must be terminated with '\0'.
2287  */
2288 static inline size_t next_token(const char **buf)
2289 {
2290         /*
2291         * These are the characters that produce nonzero for
2292         * isspace() in the "C" and "POSIX" locales.
2293         */
2294         const char *spaces = " \f\n\r\t\v";
2295
2296         *buf += strspn(*buf, spaces);   /* Find start of token */
2297
2298         return strcspn(*buf, spaces);   /* Return token length */
2299 }
2300
2301 /*
2302  * Finds the next token in *buf, and if the provided token buffer is
2303  * big enough, copies the found token into it.  The result, if
2304  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2305  * must be terminated with '\0' on entry.
2306  *
2307  * Returns the length of the token found (not including the '\0').
2308  * Return value will be 0 if no token is found, and it will be >=
2309  * token_size if the token would not fit.
2310  *
2311  * The *buf pointer will be updated to point beyond the end of the
2312  * found token.  Note that this occurs even if the token buffer is
2313  * too small to hold it.
2314  */
2315 static inline size_t copy_token(const char **buf,
2316                                 char *token,
2317                                 size_t token_size)
2318 {
2319         size_t len;
2320
2321         len = next_token(buf);
2322         if (len < token_size) {
2323                 memcpy(token, *buf, len);
2324                 *(token + len) = '\0';
2325         }
2326         *buf += len;
2327
2328         return len;
2329 }
2330
2331 /*
2332  * Finds the next token in *buf, dynamically allocates a buffer big
2333  * enough to hold a copy of it, and copies the token into the new
2334  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2335  * that a duplicate buffer is created even for a zero-length token.
2336  *
2337  * Returns a pointer to the newly-allocated duplicate, or a null
2338  * pointer if memory for the duplicate was not available.  If
2339  * the lenp argument is a non-null pointer, the length of the token
2340  * (not including the '\0') is returned in *lenp.
2341  *
2342  * If successful, the *buf pointer will be updated to point beyond
2343  * the end of the found token.
2344  *
2345  * Note: uses GFP_KERNEL for allocation.
2346  */
2347 static inline char *dup_token(const char **buf, size_t *lenp)
2348 {
2349         char *dup;
2350         size_t len;
2351
2352         len = next_token(buf);
2353         dup = kmalloc(len + 1, GFP_KERNEL);
2354         if (!dup)
2355                 return NULL;
2356
2357         memcpy(dup, *buf, len);
2358         *(dup + len) = '\0';
2359         *buf += len;
2360
2361         if (lenp)
2362                 *lenp = len;
2363
2364         return dup;
2365 }
2366
2367 /*
2368  * This fills in the pool_name, image_name, image_name_len, snap_name,
2369  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2370  * on the list of monitor addresses and other options provided via
2371  * /sys/bus/rbd/add.
2372  *
2373  * Note: rbd_dev is assumed to have been initially zero-filled.
2374  */
2375 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2376                               const char *buf,
2377                               const char **mon_addrs,
2378                               size_t *mon_addrs_size,
2379                               char *options,
2380                              size_t options_size)
2381 {
2382         size_t len;
2383         int ret;
2384
2385         /* The first four tokens are required */
2386
2387         len = next_token(&buf);
2388         if (!len)
2389                 return -EINVAL;
2390         *mon_addrs_size = len + 1;
2391         *mon_addrs = buf;
2392
2393         buf += len;
2394
2395         len = copy_token(&buf, options, options_size);
2396         if (!len || len >= options_size)
2397                 return -EINVAL;
2398
2399         ret = -ENOMEM;
2400         rbd_dev->pool_name = dup_token(&buf, NULL);
2401         if (!rbd_dev->pool_name)
2402                 goto out_err;
2403
2404         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2405         if (!rbd_dev->image_name)
2406                 goto out_err;
2407
2408         /* Create the name of the header object */
2409
2410         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2411                                                 + sizeof (RBD_SUFFIX),
2412                                         GFP_KERNEL);
2413         if (!rbd_dev->header_name)
2414                 goto out_err;
2415         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2416
2417         /*
2418          * The snapshot name is optional.  If none is is supplied,
2419          * we use the default value.
2420          */
2421         rbd_dev->snap_name = dup_token(&buf, &len);
2422         if (!rbd_dev->snap_name)
2423                 goto out_err;
2424         if (!len) {
2425                 /* Replace the empty name with the default */
2426                 kfree(rbd_dev->snap_name);
2427                 rbd_dev->snap_name
2428                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2429                 if (!rbd_dev->snap_name)
2430                         goto out_err;
2431
2432                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2433                         sizeof (RBD_SNAP_HEAD_NAME));
2434         }
2435
2436         return 0;
2437
2438 out_err:
2439         kfree(rbd_dev->header_name);
2440         kfree(rbd_dev->image_name);
2441         kfree(rbd_dev->pool_name);
2442         rbd_dev->pool_name = NULL;
2443
2444         return ret;
2445 }
2446
2447 static ssize_t rbd_add(struct bus_type *bus,
2448                        const char *buf,
2449                        size_t count)
2450 {
2451         char *options;
2452         struct rbd_device *rbd_dev = NULL;
2453         const char *mon_addrs = NULL;
2454         size_t mon_addrs_size = 0;
2455         struct ceph_osd_client *osdc;
2456         int rc = -ENOMEM;
2457
2458         if (!try_module_get(THIS_MODULE))
2459                 return -ENODEV;
2460
2461         options = kmalloc(count, GFP_KERNEL);
2462         if (!options)
2463                 goto err_nomem;
2464         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2465         if (!rbd_dev)
2466                 goto err_nomem;
2467
2468         /* static rbd_device initialization */
2469         spin_lock_init(&rbd_dev->lock);
2470         INIT_LIST_HEAD(&rbd_dev->node);
2471         INIT_LIST_HEAD(&rbd_dev->snaps);
2472         init_rwsem(&rbd_dev->header_rwsem);
2473
2474         /* generate unique id: find highest unique id, add one */
2475         rbd_id_get(rbd_dev);
2476
2477         /* Fill in the device name, now that we have its id. */
2478         BUILD_BUG_ON(DEV_NAME_LEN
2479                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2480         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2481
2482         /* parse add command */
2483         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2484                                 options, count);
2485         if (rc)
2486                 goto err_put_id;
2487
2488         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2489                                                 options);
2490         if (IS_ERR(rbd_dev->rbd_client)) {
2491                 rc = PTR_ERR(rbd_dev->rbd_client);
2492                 goto err_put_id;
2493         }
2494
2495         /* pick the pool */
2496         osdc = &rbd_dev->rbd_client->client->osdc;
2497         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2498         if (rc < 0)
2499                 goto err_out_client;
2500         rbd_dev->pool_id = rc;
2501
2502         /* register our block device */
2503         rc = register_blkdev(0, rbd_dev->name);
2504         if (rc < 0)
2505                 goto err_out_client;
2506         rbd_dev->major = rc;
2507
2508         rc = rbd_bus_add_dev(rbd_dev);
2509         if (rc)
2510                 goto err_out_blkdev;
2511
2512         /*
2513          * At this point cleanup in the event of an error is the job
2514          * of the sysfs code (initiated by rbd_bus_del_dev()).
2515          *
2516          * Set up and announce blkdev mapping.
2517          */
2518         rc = rbd_init_disk(rbd_dev);
2519         if (rc)
2520                 goto err_out_bus;
2521
2522         rc = rbd_init_watch_dev(rbd_dev);
2523         if (rc)
2524                 goto err_out_bus;
2525
2526         return count;
2527
2528 err_out_bus:
2529         /* this will also clean up rest of rbd_dev stuff */
2530
2531         rbd_bus_del_dev(rbd_dev);
2532         kfree(options);
2533         return rc;
2534
2535 err_out_blkdev:
2536         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2537 err_out_client:
2538         rbd_put_client(rbd_dev);
2539 err_put_id:
2540         if (rbd_dev->pool_name) {
2541                 kfree(rbd_dev->snap_name);
2542                 kfree(rbd_dev->header_name);
2543                 kfree(rbd_dev->image_name);
2544                 kfree(rbd_dev->pool_name);
2545         }
2546         rbd_id_put(rbd_dev);
2547 err_nomem:
2548         kfree(rbd_dev);
2549         kfree(options);
2550
2551         dout("Error adding device %s\n", buf);
2552         module_put(THIS_MODULE);
2553
2554         return (ssize_t) rc;
2555 }
2556
2557 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2558 {
2559         struct list_head *tmp;
2560         struct rbd_device *rbd_dev;
2561
2562         spin_lock(&rbd_dev_list_lock);
2563         list_for_each(tmp, &rbd_dev_list) {
2564                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2565                 if (rbd_dev->dev_id == dev_id) {
2566                         spin_unlock(&rbd_dev_list_lock);
2567                         return rbd_dev;
2568                 }
2569         }
2570         spin_unlock(&rbd_dev_list_lock);
2571         return NULL;
2572 }
2573
2574 static void rbd_dev_release(struct device *dev)
2575 {
2576         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2577
2578         if (rbd_dev->watch_request) {
2579                 struct ceph_client *client = rbd_dev->rbd_client->client;
2580
2581                 ceph_osdc_unregister_linger_request(&client->osdc,
2582                                                     rbd_dev->watch_request);
2583         }
2584         if (rbd_dev->watch_event)
2585                 rbd_req_sync_unwatch(rbd_dev);
2586
2587         rbd_put_client(rbd_dev);
2588
2589         /* clean up and free blkdev */
2590         rbd_free_disk(rbd_dev);
2591         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2592
2593         /* done with the id, and with the rbd_dev */
2594         kfree(rbd_dev->snap_name);
2595         kfree(rbd_dev->header_name);
2596         kfree(rbd_dev->pool_name);
2597         kfree(rbd_dev->image_name);
2598         rbd_id_put(rbd_dev);
2599         kfree(rbd_dev);
2600
2601         /* release module ref */
2602         module_put(THIS_MODULE);
2603 }
2604
2605 static ssize_t rbd_remove(struct bus_type *bus,
2606                           const char *buf,
2607                           size_t count)
2608 {
2609         struct rbd_device *rbd_dev = NULL;
2610         int target_id, rc;
2611         unsigned long ul;
2612         int ret = count;
2613
2614         rc = strict_strtoul(buf, 10, &ul);
2615         if (rc)
2616                 return rc;
2617
2618         /* convert to int; abort if we lost anything in the conversion */
2619         target_id = (int) ul;
2620         if (target_id != ul)
2621                 return -EINVAL;
2622
2623         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2624
2625         rbd_dev = __rbd_get_dev(target_id);
2626         if (!rbd_dev) {
2627                 ret = -ENOENT;
2628                 goto done;
2629         }
2630
2631         __rbd_remove_all_snaps(rbd_dev);
2632         rbd_bus_del_dev(rbd_dev);
2633
2634 done:
2635         mutex_unlock(&ctl_mutex);
2636         return ret;
2637 }
2638
2639 static ssize_t rbd_snap_add(struct device *dev,
2640                             struct device_attribute *attr,
2641                             const char *buf,
2642                             size_t count)
2643 {
2644         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2645         int ret;
2646         char *name = kmalloc(count + 1, GFP_KERNEL);
2647         if (!name)
2648                 return -ENOMEM;
2649
2650         snprintf(name, count, "%s", buf);
2651
2652         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2653
2654         ret = rbd_header_add_snap(rbd_dev,
2655                                   name, GFP_KERNEL);
2656         if (ret < 0)
2657                 goto err_unlock;
2658
2659         ret = __rbd_refresh_header(rbd_dev);
2660         if (ret < 0)
2661                 goto err_unlock;
2662
2663         /* shouldn't hold ctl_mutex when notifying.. notify might
2664            trigger a watch callback that would need to get that mutex */
2665         mutex_unlock(&ctl_mutex);
2666
2667         /* make a best effort, don't error if failed */
2668         rbd_req_sync_notify(rbd_dev);
2669
2670         ret = count;
2671         kfree(name);
2672         return ret;
2673
2674 err_unlock:
2675         mutex_unlock(&ctl_mutex);
2676         kfree(name);
2677         return ret;
2678 }
2679
2680 /*
2681  * create control files in sysfs
2682  * /sys/bus/rbd/...
2683  */
2684 static int rbd_sysfs_init(void)
2685 {
2686         int ret;
2687
2688         ret = device_register(&rbd_root_dev);
2689         if (ret < 0)
2690                 return ret;
2691
2692         ret = bus_register(&rbd_bus_type);
2693         if (ret < 0)
2694                 device_unregister(&rbd_root_dev);
2695
2696         return ret;
2697 }
2698
2699 static void rbd_sysfs_cleanup(void)
2700 {
2701         bus_unregister(&rbd_bus_type);
2702         device_unregister(&rbd_root_dev);
2703 }
2704
2705 int __init rbd_init(void)
2706 {
2707         int rc;
2708
2709         rc = rbd_sysfs_init();
2710         if (rc)
2711                 return rc;
2712         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2713         return 0;
2714 }
2715
2716 void __exit rbd_exit(void)
2717 {
2718         rbd_sysfs_cleanup();
2719 }
2720
2721 module_init(rbd_init);
2722 module_exit(rbd_exit);
2723
2724 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2725 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2726 MODULE_DESCRIPTION("rados block device");
2727
2728 /* following authorship retained from original osdblk.c */
2729 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2730
2731 MODULE_LICENSE("GPL");