From: Kent Overstreet Date: Thu, 25 Jul 2013 00:26:39 +0000 (-0700) Subject: bcache: Refactor journalling flow control X-Git-Tag: next-20130912~34^2~6^2~37 X-Git-Url: https://git.karo-electronics.de/?a=commitdiff_plain;h=419cf7f0cc4e32268ef8272269598b4f705c3757;p=karo-tx-linux.git bcache: Refactor journalling flow control Making things less asynchronous that don't need to be - bch_journal() only has to block when the journal or journal entry is full, which is emphatically not a fast path. So make it a normal function that just returns when it finishes, to make the code and control flow easier to follow. Signed-off-by: Kent Overstreet --- diff --git a/drivers/md/bcache/btree.c b/drivers/md/bcache/btree.c index c1a1af72f043..83ab685d1eb0 100644 --- a/drivers/md/bcache/btree.c +++ b/drivers/md/bcache/btree.c @@ -2152,9 +2152,6 @@ int bch_btree_insert(struct btree_op *op, struct cache_set *c, } } - if (op->journal) - atomic_dec_bug(op->journal); - op->journal = NULL; return ret; } diff --git a/drivers/md/bcache/closure.h b/drivers/md/bcache/closure.h index 00039924ea9d..ab011f03801f 100644 --- a/drivers/md/bcache/closure.h +++ b/drivers/md/bcache/closure.h @@ -642,7 +642,7 @@ do { \ #define continue_at_nobarrier(_cl, _fn, _wq) \ do { \ set_closure_fn(_cl, _fn, _wq); \ - closure_queue(cl); \ + closure_queue(_cl); \ return; \ } while (0) diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c index a08a147dcfe8..960b14741633 100644 --- a/drivers/md/bcache/journal.c +++ b/drivers/md/bcache/journal.c @@ -318,7 +318,6 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list, bch_keylist_push(&op->keys); op->journal = i->pin; - atomic_inc(op->journal); ret = bch_btree_insert(op, s, &op->keys); if (ret) @@ -357,48 +356,35 @@ static void btree_flush_write(struct cache_set *c) * Try to find the btree node with that references the oldest journal * entry, best is our current candidate and is locked if non NULL: */ - struct btree *b, *best = NULL; - unsigned iter; + struct btree *b, *best; + unsigned i; +retry: + best = NULL; + + for_each_cached_btree(b, c, i) + if (btree_current_write(b)->journal) { + if (!best) + best = b; + else if (journal_pin_cmp(c, + btree_current_write(best), + btree_current_write(b))) { + best = b; + } + } - for_each_cached_btree(b, c, iter) { - if (!down_write_trylock(&b->lock)) - continue; + b = best; + if (b) { + rw_lock(true, b, b->level); - if (!btree_node_dirty(b) || - !btree_current_write(b)->journal) { + if (!btree_current_write(b)->journal) { rw_unlock(true, b); - continue; + /* We raced */ + goto retry; } - if (!best) - best = b; - else if (journal_pin_cmp(c, - btree_current_write(best), - btree_current_write(b))) { - rw_unlock(true, best); - best = b; - } else - rw_unlock(true, b); + bch_btree_node_write(b, NULL); + rw_unlock(true, b); } - - if (best) - goto out; - - /* We can't find the best btree node, just pick the first */ - list_for_each_entry(b, &c->btree_cache, list) - if (!b->level && btree_node_dirty(b)) { - best = b; - rw_lock(true, best, best->level); - goto found; - } - -out: - if (!best) - return; -found: - if (btree_node_dirty(best)) - bch_btree_node_write(best, NULL); - rw_unlock(true, best); } #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) @@ -494,7 +480,7 @@ static void journal_reclaim(struct cache_set *c) do_journal_discard(ca); if (c->journal.blocks_free) - return; + goto out; /* * Allocate: @@ -520,7 +506,7 @@ static void journal_reclaim(struct cache_set *c) if (n) c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; - +out: if (!journal_full(&c->journal)) __closure_wake_up(&c->journal.wait); } @@ -588,23 +574,7 @@ static void journal_write_unlocked(struct closure *cl) struct bio_list list; bio_list_init(&list); - if (!w->need_write) { - /* - * XXX: have to unlock closure before we unlock journal lock, - * else we race with bch_journal(). But this way we race - * against cache set unregister. Doh. - */ - set_closure_fn(cl, NULL, NULL); - closure_sub(cl, CLOSURE_RUNNING + 1); - spin_unlock(&c->journal.lock); - return; - } else if (journal_full(&c->journal)) { - journal_reclaim(c); - spin_unlock(&c->journal.lock); - - btree_flush_write(c); - continue_at(cl, journal_write, system_wq); - } + BUG_ON(journal_full(&c->journal)); c->journal.blocks_free -= set_blocks(w->data, c); @@ -662,40 +632,79 @@ static void journal_write(struct closure *cl) struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl); spin_lock(&c->journal.lock); - journal_write_unlocked(cl); + + if (!c->journal.cur->need_write) { + /* + * XXX: have to unlock closure before we unlock journal lock, + * else we race with bch_journal(). But this way we race + * against cache set unregister. Doh. + */ + set_closure_fn(cl, NULL, NULL); + closure_sub(cl, CLOSURE_RUNNING + 1); + spin_unlock(&c->journal.lock); + } else + journal_write_unlocked(cl); } -static void __journal_try_write(struct cache_set *c, bool noflush) +static void journal_try_write(struct cache_set *c) __releases(c->journal.lock) { struct closure *cl = &c->journal.io.cl; - if (!closure_trylock(cl, &c->cl)) - spin_unlock(&c->journal.lock); - else if (noflush && journal_full(&c->journal)) { - spin_unlock(&c->journal.lock); - continue_at(cl, journal_write, system_wq); - } else + if (closure_trylock(cl, &c->cl)) journal_write_unlocked(cl); + else + spin_unlock(&c->journal.lock); } -#define journal_try_write(c) __journal_try_write(c, false) - -void bch_journal_meta(struct cache_set *c, struct closure *cl) +static struct journal_write *journal_wait_for_write(struct cache_set *c, + unsigned nkeys) { - struct journal_write *w; + size_t sectors; + struct closure cl; - if (CACHE_SYNC(&c->sb)) { - spin_lock(&c->journal.lock); + closure_init_stack(&cl); - w = c->journal.cur; - w->need_write = true; + spin_lock(&c->journal.lock); - if (cl) - BUG_ON(!closure_wait(&w->wait, cl)); + while (1) { + struct journal_write *w = c->journal.cur; - closure_flush(&c->journal.io); - __journal_try_write(c, true); + sectors = __set_blocks(w->data, w->data->keys + nkeys, + c) * c->sb.block_size; + + if (sectors <= min_t(size_t, + c->journal.blocks_free * c->sb.block_size, + PAGE_SECTORS << JSET_BITS)) + return w; + + /* XXX: tracepoint */ + if (!journal_full(&c->journal)) { + trace_bcache_journal_entry_full(c); + + /* + * XXX: If we were inserting so many keys that they + * won't fit in an _empty_ journal write, we'll + * deadlock. For now, handle this in + * bch_keylist_realloc() - but something to think about. + */ + BUG_ON(!w->data->keys || !w->need_write); + + closure_wait(&w->wait, &cl); + closure_flush(&c->journal.io); + journal_try_write(c); /* unlocks */ + } else { + trace_bcache_journal_full(c); + + closure_wait(&c->journal.wait, &cl); + journal_reclaim(c); + spin_unlock(&c->journal.lock); + + btree_flush_write(c); + } + + closure_sync(&cl); + spin_lock(&c->journal.lock); } } @@ -705,77 +714,45 @@ void bch_journal_meta(struct cache_set *c, struct closure *cl) * bch_journal() hands those same keys off to btree_insert_async() */ -void bch_journal(struct closure *cl) +atomic_t *bch_journal(struct cache_set *c, + struct keylist *keys, + struct closure *parent) { - struct btree_op *op = container_of(cl, struct btree_op, cl); - struct cache_set *c = op->c; struct journal_write *w; - size_t sectors, nkeys; - - if (op->type != BTREE_INSERT || - !CACHE_SYNC(&c->sb)) - goto out; - - /* - * If we're looping because we errored, might already be waiting on - * another journal write: - */ - while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING) - closure_sync(cl->parent); + atomic_t *ret; - spin_lock(&c->journal.lock); - - if (journal_full(&c->journal)) { - trace_bcache_journal_full(c); - - closure_wait(&c->journal.wait, cl); + if (!CACHE_SYNC(&c->sb)) + return NULL; - journal_reclaim(c); - spin_unlock(&c->journal.lock); - - btree_flush_write(c); - continue_at(cl, bch_journal, bcache_wq); - } + w = journal_wait_for_write(c, bch_keylist_nkeys(keys)); - w = c->journal.cur; + memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys)); + w->data->keys += bch_keylist_nkeys(keys); w->need_write = true; - nkeys = w->data->keys + bch_keylist_nkeys(&op->keys); - sectors = __set_blocks(w->data, nkeys, c) * c->sb.block_size; - if (sectors > min_t(size_t, - c->journal.blocks_free * c->sb.block_size, - PAGE_SECTORS << JSET_BITS)) { - trace_bcache_journal_entry_full(c); - - /* - * XXX: If we were inserting so many keys that they won't fit in - * an _empty_ journal write, we'll deadlock. For now, handle - * this in bch_keylist_realloc() - but something to think about. - */ - BUG_ON(!w->data->keys); - - BUG_ON(!closure_wait(&w->wait, cl)); + ret = &fifo_back(&c->journal.pin); + atomic_inc(ret); + if (parent) { + closure_wait(&w->wait, parent); closure_flush(&c->journal.io); - - journal_try_write(c); - continue_at(cl, bch_journal, bcache_wq); } - memcpy(end(w->data), op->keys.keys, bch_keylist_bytes(&op->keys)); - w->data->keys += bch_keylist_nkeys(&op->keys); + journal_try_write(c); - op->journal = &fifo_back(&c->journal.pin); - atomic_inc(op->journal); + return ret; +} - if (op->flush_journal) { - closure_flush(&c->journal.io); - closure_wait(&w->wait, cl->parent); - } +void bch_journal_meta(struct cache_set *c, struct closure *cl) +{ + struct keylist keys; + atomic_t *ref; - journal_try_write(c); -out: - bch_btree_insert_async(cl); + bch_keylist_init(&keys); + + ref = bch_journal(c, &keys, cl); + if (ref) + atomic_dec_bug(ref); } void bch_journal_free(struct cache_set *c) diff --git a/drivers/md/bcache/journal.h b/drivers/md/bcache/journal.h index 3d7851274b04..c64d525c8559 100644 --- a/drivers/md/bcache/journal.h +++ b/drivers/md/bcache/journal.h @@ -199,8 +199,9 @@ struct journal_device { struct closure; struct cache_set; struct btree_op; +struct keylist; -void bch_journal(struct closure *); +atomic_t *bch_journal(struct cache_set *, struct keylist *, struct closure *); void bch_journal_next(struct journal *); void bch_journal_mark(struct cache_set *, struct list_head *); void bch_journal_meta(struct cache_set *, struct closure *); diff --git a/drivers/md/bcache/movinggc.c b/drivers/md/bcache/movinggc.c index 1a3b4f4786c3..e5281f41bccd 100644 --- a/drivers/md/bcache/movinggc.c +++ b/drivers/md/bcache/movinggc.c @@ -110,7 +110,7 @@ static void write_moving(struct closure *cl) bkey_copy(&s->op.replace, &io->w->key); closure_init(&s->op.cl, cl); - bch_insert_data(&s->op.cl); + bch_data_insert(&s->op.cl); } continue_at(cl, write_moving_finish, NULL); diff --git a/drivers/md/bcache/request.c b/drivers/md/bcache/request.c index 4febe4d863f3..65c7c221e836 100644 --- a/drivers/md/bcache/request.c +++ b/drivers/md/bcache/request.c @@ -25,6 +25,8 @@ struct kmem_cache *bch_search_cache; +static void bch_data_insert_start(struct closure *); + /* Cgroup interface */ #ifdef CONFIG_CGROUP_BCACHE @@ -211,31 +213,42 @@ static void bio_csum(struct bio *bio, struct bkey *k) /* Insert data into cache */ -static void bio_invalidate(struct closure *cl) +static void bch_data_insert_keys(struct closure *cl) { struct btree_op *op = container_of(cl, struct btree_op, cl); - struct bio *bio = op->cache_bio; + struct search *s = container_of(op, struct search, op); - pr_debug("invalidating %i sectors from %llu", - bio_sectors(bio), (uint64_t) bio->bi_sector); + /* + * If we're looping, might already be waiting on + * another journal write - can't wait on more than one journal write at + * a time + * + * XXX: this looks wrong + */ +#if 0 + while (atomic_read(&s->cl.remaining) & CLOSURE_WAITING) + closure_sync(&s->cl); +#endif - while (bio_sectors(bio)) { - unsigned len = min(bio_sectors(bio), 1U << 14); + if (s->write) + op->journal = bch_journal(op->c, &op->keys, + op->flush_journal + ? &s->cl : NULL); - if (bch_keylist_realloc(&op->keys, 0, op->c)) - goto out; + if (bch_btree_insert(op, op->c, &op->keys)) { + s->error = -ENOMEM; + op->insert_data_done = true; + } - bio->bi_sector += len; - bio->bi_size -= len << 9; + if (op->journal) + atomic_dec_bug(op->journal); + op->journal = NULL; - bch_keylist_add(&op->keys, - &KEY(op->inode, bio->bi_sector, len)); - } + if (!op->insert_data_done) + continue_at(cl, bch_data_insert_start, bcache_wq); - op->insert_data_done = true; - bio_put(bio); -out: - continue_at(cl, bch_journal, bcache_wq); + bch_keylist_free(&op->keys); + closure_return(cl); } struct open_bucket { @@ -423,7 +436,34 @@ static bool bch_alloc_sectors(struct bkey *k, unsigned sectors, return true; } -static void bch_insert_data_error(struct closure *cl) +static void bch_data_invalidate(struct closure *cl) +{ + struct btree_op *op = container_of(cl, struct btree_op, cl); + struct bio *bio = op->cache_bio; + + pr_debug("invalidating %i sectors from %llu", + bio_sectors(bio), (uint64_t) bio->bi_sector); + + while (bio_sectors(bio)) { + unsigned len = min(bio_sectors(bio), 1U << 14); + + if (bch_keylist_realloc(&op->keys, 0, op->c)) + goto out; + + bio->bi_sector += len; + bio->bi_size -= len << 9; + + bch_keylist_add(&op->keys, &KEY(op->inode, + bio->bi_sector, len)); + } + + op->insert_data_done = true; + bio_put(bio); +out: + continue_at(cl, bch_data_insert_keys, bcache_wq); +} + +static void bch_data_insert_error(struct closure *cl) { struct btree_op *op = container_of(cl, struct btree_op, cl); @@ -450,10 +490,10 @@ static void bch_insert_data_error(struct closure *cl) op->keys.top = dst; - bch_journal(cl); + bch_data_insert_keys(cl); } -static void bch_insert_data_endio(struct bio *bio, int error) +static void bch_data_insert_endio(struct bio *bio, int error) { struct closure *cl = bio->bi_private; struct btree_op *op = container_of(cl, struct btree_op, cl); @@ -464,7 +504,7 @@ static void bch_insert_data_endio(struct bio *bio, int error) if (s->writeback) s->error = error; else if (s->write) - set_closure_fn(cl, bch_insert_data_error, bcache_wq); + set_closure_fn(cl, bch_data_insert_error, bcache_wq); else set_closure_fn(cl, NULL, NULL); } @@ -472,14 +512,14 @@ static void bch_insert_data_endio(struct bio *bio, int error) bch_bbio_endio(op->c, bio, error, "writing data to cache"); } -static void bch_insert_data_loop(struct closure *cl) +static void bch_data_insert_start(struct closure *cl) { struct btree_op *op = container_of(cl, struct btree_op, cl); struct search *s = container_of(op, struct search, op); struct bio *bio = op->cache_bio, *n; if (op->bypass) - return bio_invalidate(cl); + return bch_data_invalidate(cl); if (atomic_sub_return(bio_sectors(bio), &op->c->sectors_to_gc) < 0) { set_gc_sectors(op->c); @@ -502,7 +542,7 @@ static void bch_insert_data_loop(struct closure *cl) if (bch_keylist_realloc(&op->keys, 1 + (op->csum ? 1 : 0), op->c)) - continue_at(cl, bch_journal, bcache_wq); + continue_at(cl, bch_data_insert_keys, bcache_wq); k = op->keys.top; bkey_init(k); @@ -514,7 +554,7 @@ static void bch_insert_data_loop(struct closure *cl) n = bch_bio_split(bio, KEY_SIZE(k), GFP_NOIO, split); - n->bi_end_io = bch_insert_data_endio; + n->bi_end_io = bch_data_insert_endio; n->bi_private = cl; if (s->writeback) { @@ -537,7 +577,7 @@ static void bch_insert_data_loop(struct closure *cl) } while (n != bio); op->insert_data_done = true; - continue_at(cl, bch_journal, bcache_wq); + continue_at(cl, bch_data_insert_keys, bcache_wq); err: /* bch_alloc_sectors() blocks if s->writeback = true */ BUG_ON(s->writeback); @@ -556,7 +596,7 @@ err: * rest of the write. */ op->bypass = true; - return bio_invalidate(cl); + return bch_data_invalidate(cl); } else { /* * From a cache miss, we can just insert the keys for the data @@ -566,14 +606,14 @@ err: bio_put(bio); if (!bch_keylist_empty(&op->keys)) - continue_at(cl, bch_journal, bcache_wq); + continue_at(cl, bch_data_insert_keys, bcache_wq); else closure_return(cl); } } /** - * bch_insert_data - stick some data in the cache + * bch_data_insert - stick some data in the cache * * This is the starting point for any data to end up in a cache device; it could * be from a normal write, or a writeback write, or a write to a flash only @@ -591,30 +631,13 @@ err: * If op->bypass is true, instead of inserting the data it invalidates the * region of the cache represented by op->cache_bio and op->inode. */ -void bch_insert_data(struct closure *cl) +void bch_data_insert(struct closure *cl) { struct btree_op *op = container_of(cl, struct btree_op, cl); bch_keylist_init(&op->keys); bio_get(op->cache_bio); - bch_insert_data_loop(cl); -} - -void bch_btree_insert_async(struct closure *cl) -{ - struct btree_op *op = container_of(cl, struct btree_op, cl); - struct search *s = container_of(op, struct search, op); - - if (bch_btree_insert(op, op->c, &op->keys)) { - s->error = -ENOMEM; - op->insert_data_done = true; - } - - if (op->insert_data_done) { - bch_keylist_free(&op->keys); - closure_return(cl); - } else - continue_at(cl, bch_insert_data_loop, bcache_wq); + bch_data_insert_start(cl); } /* Common code for the make_request functions */ @@ -969,7 +992,7 @@ static void cached_dev_read_done(struct closure *cl) if (s->op.cache_bio && !test_bit(CACHE_SET_STOPPING, &s->op.c->flags)) { s->op.type = BTREE_REPLACE; - closure_call(&s->op.cl, bch_insert_data, NULL, cl); + closure_call(&s->op.cl, bch_data_insert, NULL, cl); } continue_at(cl, cached_dev_cache_miss_done, NULL); @@ -1146,13 +1169,13 @@ static void cached_dev_write(struct cached_dev *dc, struct search *s) closure_bio_submit(bio, cl, s->d); } - closure_call(&s->op.cl, bch_insert_data, NULL, cl); + closure_call(&s->op.cl, bch_data_insert, NULL, cl); continue_at(cl, cached_dev_write_complete, NULL); } -static void cached_dev_nodata(struct cached_dev *dc, struct search *s) +static void cached_dev_nodata(struct closure *cl) { - struct closure *cl = &s->cl; + struct search *s = container_of(cl, struct search, cl); struct bio *bio = &s->bio.bio; if (s->op.flush_journal) @@ -1185,9 +1208,15 @@ static void cached_dev_make_request(struct request_queue *q, struct bio *bio) s = search_alloc(bio, d); trace_bcache_request_start(s, bio); - if (!bio->bi_size) - cached_dev_nodata(dc, s); - else { + if (!bio->bi_size) { + /* + * can't call bch_journal_meta from under + * generic_make_request + */ + continue_at_nobarrier(&s->cl, + cached_dev_nodata, + bcache_wq); + } else { s->op.bypass = check_should_bypass(dc, s); if (rw) @@ -1274,6 +1303,16 @@ static int flash_dev_cache_miss(struct btree *b, struct search *s, return 0; } +static void flash_dev_nodata(struct closure *cl) +{ + struct search *s = container_of(cl, struct search, cl); + + if (s->op.flush_journal) + bch_journal_meta(s->op.c, cl); + + continue_at(cl, search_free, NULL); +} + static void flash_dev_make_request(struct request_queue *q, struct bio *bio) { struct search *s; @@ -1293,8 +1332,13 @@ static void flash_dev_make_request(struct request_queue *q, struct bio *bio) trace_bcache_request_start(s, bio); if (!bio->bi_size) { - if (s->op.flush_journal) - bch_journal_meta(s->op.c, cl); + /* + * can't call bch_journal_meta from under + * generic_make_request + */ + continue_at_nobarrier(&s->cl, + flash_dev_nodata, + bcache_wq); } else if (rw) { bch_keybuf_check_overlapping(&s->op.c->moving_gc_keys, &KEY(d->id, bio->bi_sector, 0), @@ -1304,7 +1348,7 @@ static void flash_dev_make_request(struct request_queue *q, struct bio *bio) s->writeback = true; s->op.cache_bio = bio; - closure_call(&s->op.cl, bch_insert_data, NULL, cl); + closure_call(&s->op.cl, bch_data_insert, NULL, cl); } else { closure_call(&s->op.cl, btree_read_async, NULL, cl); } diff --git a/drivers/md/bcache/request.h b/drivers/md/bcache/request.h index 57dc4784f4f4..1f1b59d38db5 100644 --- a/drivers/md/bcache/request.h +++ b/drivers/md/bcache/request.h @@ -31,8 +31,7 @@ struct search { void bch_cache_read_endio(struct bio *, int); unsigned bch_get_congested(struct cache_set *); -void bch_insert_data(struct closure *cl); -void bch_btree_insert_async(struct closure *); +void bch_data_insert(struct closure *cl); void bch_cache_read_endio(struct bio *, int); void bch_open_buckets_free(struct cache_set *);