]> git.karo-electronics.de Git - karo-tx-linux.git/commitdiff
block, aio: batch completion for bios/kiocbs
authorKent Overstreet <koverstreet@google.com>
Thu, 23 May 2013 00:38:17 +0000 (10:38 +1000)
committerStephen Rothwell <sfr@canb.auug.org.au>
Mon, 27 May 2013 06:22:05 +0000 (16:22 +1000)
When completing a kiocb, there's some fixed overhead from touching the
kioctx's ring buffer the kiocb belongs to.  Some newer high end block
devices can complete multiple IOs per interrupt, much like many network
interfaces have been for some time.

This plumbs through infrastructure so we can take advantage of multiple
completions at the interrupt level, and complete multiple kiocbs at the
same time.

Drivers have to be converted to take advantage of this, but it's a simple
change and the next patches will convert a few drivers.

To use it, an interrupt handler (or any code that completes bios or
requests) declares and initializes a struct batch_complete:

struct batch_complete batch;
batch_complete_init(&batch);

Then, instead of calling bio_endio(), it calls
bio_endio_batch(bio, err, &batch). This just adds the bio to a list in
the batch_complete.

At the end, it calls

batch_complete(&batch);

This completes all the bios all at once, building up a list of kiocbs;
then the list of kiocbs are completed all at once.

[akpm@linux-foundation.org: fix warning]
[akpm@linux-foundation.org: fs/aio.c needs bio.h, move bio_endio_batch() declaration somewhere rational]
[akpm@linux-foundation.org: fix warnings]
[minchan@kernel.org: fix build error due to bio_endio_batch]
[akpm@linux-foundation.org: fix tracepoint in batch_complete()]
[akpm@linux-foundation.org: coding-style fixes]
Signed-off-by: Kent Overstreet <koverstreet@google.com>
Cc: Zach Brown <zab@redhat.com>
Cc: Felipe Balbi <balbi@ti.com>
Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
Cc: Mark Fasheh <mfasheh@suse.com>
Cc: Joel Becker <jlbec@evilplan.org>
Cc: Rusty Russell <rusty@rustcorp.com.au>
Cc: Jens Axboe <axboe@kernel.dk>
Cc: Asai Thambi S P <asamymuthupa@micron.com>
Cc: Selvan Mani <smani@micron.com>
Cc: Sam Bradshaw <sbradshaw@micron.com>
Cc: Jeff Moyer <jmoyer@redhat.com>
Cc: Al Viro <viro@zeniv.linux.org.uk>
Cc: Benjamin LaHaise <bcrl@kvack.org>
Cc: Theodore Ts'o <tytso@mit.edu>
Signed-off-by: Minchan Kim <minchan@kernel.org>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
13 files changed:
block/blk-core.c
block/blk-flush.c
block/blk.h
drivers/block/swim3.c
drivers/md/dm.c
fs/aio.c
fs/bio.c
fs/direct-io.c
include/linux/aio.h
include/linux/batch_complete.h [new file with mode: 0644]
include/linux/bio.h
include/linux/blk_types.h
include/linux/blkdev.h

index 0852e5d43436646dab68878ef69c0930dfe28a4d..25063acb5bddc0ce6842c7d59a38b6cf65e1cfe2 100644 (file)
@@ -153,7 +153,8 @@ void blk_rq_init(struct request_queue *q, struct request *rq)
 EXPORT_SYMBOL(blk_rq_init);
 
 static void req_bio_endio(struct request *rq, struct bio *bio,
-                         unsigned int nbytes, int error)
+                         unsigned int nbytes, int error,
+                         struct batch_complete *batch)
 {
        if (error)
                clear_bit(BIO_UPTODATE, &bio->bi_flags);
@@ -167,7 +168,7 @@ static void req_bio_endio(struct request *rq, struct bio *bio,
 
        /* don't actually finish bio if it's part of flush sequence */
        if (bio->bi_size == 0 && !(rq->cmd_flags & REQ_FLUSH_SEQ))
-               bio_endio(bio, error);
+               bio_endio_batch(bio, error, batch);
 }
 
 void blk_dump_rq_flags(struct request *rq, char *msg)
@@ -2281,7 +2282,8 @@ EXPORT_SYMBOL(blk_fetch_request);
  *     %false - this request doesn't have any more data
  *     %true  - this request has more data
  **/
-bool blk_update_request(struct request *req, int error, unsigned int nr_bytes)
+bool blk_update_request(struct request *req, int error, unsigned int nr_bytes,
+                       struct batch_complete *batch)
 {
        int total_bytes;
 
@@ -2337,7 +2339,7 @@ bool blk_update_request(struct request *req, int error, unsigned int nr_bytes)
                if (bio_bytes == bio->bi_size)
                        req->bio = bio->bi_next;
 
-               req_bio_endio(req, bio, bio_bytes, error);
+               req_bio_endio(req, bio, bio_bytes, error, batch);
 
                total_bytes += bio_bytes;
                nr_bytes -= bio_bytes;
@@ -2390,14 +2392,15 @@ EXPORT_SYMBOL_GPL(blk_update_request);
 
 static bool blk_update_bidi_request(struct request *rq, int error,
                                    unsigned int nr_bytes,
-                                   unsigned int bidi_bytes)
+                                   unsigned int bidi_bytes,
+                                   struct batch_complete *batch)
 {
-       if (blk_update_request(rq, error, nr_bytes))
+       if (blk_update_request(rq, error, nr_bytes, batch))
                return true;
 
        /* Bidi request must be completed as a whole */
        if (unlikely(blk_bidi_rq(rq)) &&
-           blk_update_request(rq->next_rq, error, bidi_bytes))
+           blk_update_request(rq->next_rq, error, bidi_bytes, batch))
                return true;
 
        if (blk_queue_add_random(rq->q))
@@ -2480,7 +2483,7 @@ static bool blk_end_bidi_request(struct request *rq, int error,
        struct request_queue *q = rq->q;
        unsigned long flags;
 
-       if (blk_update_bidi_request(rq, error, nr_bytes, bidi_bytes))
+       if (blk_update_bidi_request(rq, error, nr_bytes, bidi_bytes, NULL))
                return true;
 
        spin_lock_irqsave(q->queue_lock, flags);
@@ -2506,9 +2509,10 @@ static bool blk_end_bidi_request(struct request *rq, int error,
  *     %true  - still buffers pending for this request
  **/
 bool __blk_end_bidi_request(struct request *rq, int error,
-                                  unsigned int nr_bytes, unsigned int bidi_bytes)
+                                  unsigned int nr_bytes, unsigned int bidi_bytes,
+                                  struct batch_complete *batch)
 {
-       if (blk_update_bidi_request(rq, error, nr_bytes, bidi_bytes))
+       if (blk_update_bidi_request(rq, error, nr_bytes, bidi_bytes, batch))
                return true;
 
        blk_finish_request(rq, error);
@@ -2609,7 +2613,7 @@ EXPORT_SYMBOL_GPL(blk_end_request_err);
  **/
 bool __blk_end_request(struct request *rq, int error, unsigned int nr_bytes)
 {
-       return __blk_end_bidi_request(rq, error, nr_bytes, 0);
+       return __blk_end_bidi_request(rq, error, nr_bytes, 0, NULL);
 }
 EXPORT_SYMBOL(__blk_end_request);
 
@@ -2621,7 +2625,7 @@ EXPORT_SYMBOL(__blk_end_request);
  * Description:
  *     Completely finish @rq.  Must be called with queue lock held.
  */
-void __blk_end_request_all(struct request *rq, int error)
+void blk_end_request_all_batch(struct request *rq, int error, struct batch_complete *batch)
 {
        bool pending;
        unsigned int bidi_bytes = 0;
@@ -2629,10 +2633,10 @@ void __blk_end_request_all(struct request *rq, int error)
        if (unlikely(blk_bidi_rq(rq)))
                bidi_bytes = blk_rq_bytes(rq->next_rq);
 
-       pending = __blk_end_bidi_request(rq, error, blk_rq_bytes(rq), bidi_bytes);
+       pending = __blk_end_bidi_request(rq, error, blk_rq_bytes(rq), bidi_bytes, batch);
        BUG_ON(pending);
 }
-EXPORT_SYMBOL(__blk_end_request_all);
+EXPORT_SYMBOL(blk_end_request_all_batch);
 
 /**
  * __blk_end_request_cur - Helper function to finish the current request chunk.
index 762cfcac839558eb720c3aee92ab0921f0e2dd9e..ab0ed23589477f2104686011764fb2565fa69b35 100644 (file)
@@ -316,7 +316,7 @@ void blk_insert_flush(struct request *rq)
         * complete the request.
         */
        if (!policy) {
-               __blk_end_bidi_request(rq, 0, 0, 0);
+               __blk_end_bidi_request(rq, 0, 0, 0, NULL);
                return;
        }
 
index e837b8f619b7d646825d43ea5ddeb9beacbea3d2..dc8fee6d41d669401e6adf4988755e74519c2810 100644 (file)
@@ -31,7 +31,8 @@ void blk_queue_bypass_end(struct request_queue *q);
 void blk_dequeue_request(struct request *rq);
 void __blk_queue_free_tags(struct request_queue *q);
 bool __blk_end_bidi_request(struct request *rq, int error,
-                           unsigned int nr_bytes, unsigned int bidi_bytes);
+                           unsigned int nr_bytes, unsigned int bidi_bytes,
+                           struct batch_complete *batch);
 
 void blk_rq_timed_out_timer(unsigned long data);
 void blk_delete_timer(struct request *);
index 20e061c3e02329c0662b4ce8fac0ad2b82d9c8ce..9282e66b75472868bb3d8024a521409d22e39e49 100644 (file)
@@ -775,7 +775,7 @@ static irqreturn_t swim3_interrupt(int irq, void *dev_id)
                if (intr & ERROR_INTR) {
                        n = fs->scount - 1 - resid / 512;
                        if (n > 0) {
-                               blk_update_request(req, 0, n << 9);
+                               blk_update_request(req, 0, n << 9, NULL);
                                fs->req_sector += n;
                        }
                        if (fs->retries < 5) {
index 910112429ab8452b7888f6a63f21358b73310223..290106082244545a139b33d6d64e851ccd1c0130 100644 (file)
@@ -696,7 +696,7 @@ static void end_clone_bio(struct bio *clone, int error,
         * Do not use blk_end_request() here, because it may complete
         * the original request before the clone, and break the ordering.
         */
-       blk_update_request(tio->orig, 0, nr_bytes);
+       blk_update_request(tio->orig, 0, nr_bytes, NULL);
 }
 
 /*
index 5e1f14a605f3e318b7f9af3860295c04447a9514..923ba00cdd4d0fd581b4a811a58d0dcd4b821622 100644 (file)
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -25,6 +25,7 @@
 #include <linux/file.h>
 #include <linux/mm.h>
 #include <linux/mman.h>
+#include <linux/bio.h>
 #include <linux/mmu_context.h>
 #include <linux/percpu.h>
 #include <linux/slab.h>
@@ -676,71 +677,11 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
        return ret;
 }
 
-/* aio_complete
- *     Called when the io request on the given iocb is complete.
- */
-void aio_complete(struct kiocb *iocb, long res, long res2)
+static inline unsigned kioctx_ring_put(struct kioctx *ctx, struct kiocb *req,
+                                      unsigned tail)
 {
-       struct kioctx   *ctx = iocb->ki_ctx;
-       struct aio_ring *ring;
        struct io_event *ev_page, *event;
-       unsigned long   flags;
-       unsigned tail, pos;
-
-       /*
-        * Special case handling for sync iocbs:
-        *  - events go directly into the iocb for fast handling
-        *  - the sync task with the iocb in its stack holds the single iocb
-        *    ref, no other paths have a way to get another ref
-        *  - the sync task helpfully left a reference to itself in the iocb
-        */
-       if (is_sync_kiocb(iocb)) {
-               BUG_ON(atomic_read(&iocb->ki_users) != 1);
-               iocb->ki_user_data = res;
-               atomic_set(&iocb->ki_users, 0);
-               wake_up_process(iocb->ki_obj.tsk);
-               return;
-       }
-
-       /*
-        * Take rcu_read_lock() in case the kioctx is being destroyed, as we
-        * need to issue a wakeup after incrementing reqs_available.
-        */
-       rcu_read_lock();
-
-       if (iocb->ki_list.next) {
-               unsigned long flags;
-
-               spin_lock_irqsave(&ctx->ctx_lock, flags);
-               list_del(&iocb->ki_list);
-               spin_unlock_irqrestore(&ctx->ctx_lock, flags);
-       }
-
-       /*
-        * cancelled requests don't get events, userland was given one
-        * when the event got cancelled.
-        */
-       if (unlikely(xchg(&iocb->ki_cancel,
-                         KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
-               /*
-                * Can't use the percpu reqs_available here - could race with
-                * free_ioctx()
-                */
-               atomic_inc(&ctx->reqs_available);
-               smp_mb__after_atomic_inc();
-               /* Still need the wake_up in case free_ioctx is waiting */
-               goto put_rq;
-       }
-
-       /*
-        * Add a completion event to the ring buffer; ctx->tail is both our lock
-        * and the canonical version of the tail pointer.
-        */
-       local_irq_save(flags);
-       while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
-               cpu_relax();
-
-       pos = tail + AIO_EVENTS_OFFSET;
+       unsigned pos = tail + AIO_EVENTS_OFFSET;
 
        if (++tail >= ctx->nr_events)
                tail = 0;
@@ -748,22 +689,44 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
        ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
        event = ev_page + pos % AIO_EVENTS_PER_PAGE;
 
-       event->obj = (u64)(unsigned long)iocb->ki_obj.user;
-       event->data = iocb->ki_user_data;
-       event->res res;
-       event->res2 res2;
+       event->obj      = (u64)(unsigned long)req->ki_obj.user;
+       event->data     = req->ki_user_data;
+       event->res      = req->ki_res;
+       event->res2     = req->ki_res2;
 
        kunmap_atomic(ev_page);
        flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
 
        pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n",
-                ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
-                res, res2);
+                ctx, tail, req, req->ki_obj.user, req->ki_user_data,
+                req->ki_res, req->ki_res2);
+
+       return tail;
+}
 
-       /* after flagging the request as done, we
-        * must never even look at it again
+static inline unsigned kioctx_ring_lock(struct kioctx *ctx)
+{
+       unsigned tail;
+
+       /*
+        * ctx->tail is both our lock and the canonical version of the tail
+        * pointer.
         */
-       smp_wmb();      /* make event visible before updating tail */
+       while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
+               cpu_relax();
+
+       return tail;
+}
+
+static inline void kioctx_ring_unlock(struct kioctx *ctx, unsigned tail)
+{
+       struct aio_ring *ring;
+
+       if (!ctx)
+               return;
+
+       smp_wmb();
+       /* make event visible before updating tail */
 
        ctx->shadow_tail = tail;
 
@@ -776,28 +739,156 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
        smp_mb();
 
        ctx->tail = tail;
-       local_irq_restore(flags);
 
-       pr_debug("added to ring %p at [%u]\n", iocb, tail);
+       if (waitqueue_active(&ctx->wait))
+               wake_up(&ctx->wait);
+}
+
+void batch_complete_aio(struct batch_complete *batch)
+{
+       struct kioctx *ctx = NULL;
+       struct eventfd_ctx *eventfd = NULL;
+       struct rb_node *n;
+       unsigned long flags;
+       unsigned tail = 0;
+
+       if (RB_EMPTY_ROOT(&batch->kiocb))
+               return;
+
+       /*
+        * Take rcu_read_lock() in case the kioctx is being destroyed, as we
+        * need to issue a wakeup after incrementing reqs_available.
+        */
+       rcu_read_lock();
+       local_irq_save(flags);
+
+       n = rb_first(&batch->kiocb);
+       while (n) {
+               struct kiocb *req = container_of(n, struct kiocb, ki_node);
+
+               if (n->rb_right) {
+                       n->rb_right->__rb_parent_color = n->__rb_parent_color;
+                       n = n->rb_right;
+
+                       while (n->rb_left)
+                               n = n->rb_left;
+               } else {
+                       n = rb_parent(n);
+               }
+
+               if (unlikely(xchg(&req->ki_cancel,
+                                 KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
+                       /*
+                        * Can't use the percpu reqs_available here - could race
+                        * with free_ioctx()
+                        */
+                       atomic_inc(&req->ki_ctx->reqs_available);
+                       aio_put_req(req);
+                       continue;
+               }
+
+               if (unlikely(req->ki_eventfd != eventfd)) {
+                       if (eventfd) {
+                               /* Make event visible */
+                               kioctx_ring_unlock(ctx, tail);
+                               ctx = NULL;
+
+                               eventfd_signal(eventfd, 1);
+                               eventfd_ctx_put(eventfd);
+                       }
+
+                       eventfd = req->ki_eventfd;
+                       req->ki_eventfd = NULL;
+               }
+
+               if (unlikely(req->ki_ctx != ctx)) {
+                       kioctx_ring_unlock(ctx, tail);
+
+                       ctx = req->ki_ctx;
+                       tail = kioctx_ring_lock(ctx);
+               }
+
+               tail = kioctx_ring_put(ctx, req, tail);
+               aio_put_req(req);
+       }
+
+       kioctx_ring_unlock(ctx, tail);
+       local_irq_restore(flags);
+       rcu_read_unlock();
 
        /*
         * Check if the user asked us to deliver the result through an
         * eventfd. The eventfd_signal() function is safe to be called
         * from IRQ context.
         */
-       if (iocb->ki_eventfd != NULL)
-               eventfd_signal(iocb->ki_eventfd, 1);
+       if (eventfd) {
+               eventfd_signal(eventfd, 1);
+               eventfd_ctx_put(eventfd);
+       }
+}
+EXPORT_SYMBOL(batch_complete_aio);
 
-put_rq:
-       /* everything turned out well, dispose of the aiocb. */
-       aio_put_req(iocb);
+/* aio_complete_batch
+ *     Called when the io request on the given iocb is complete; @batch may be
+ *     NULL.
+ */
+void aio_complete_batch(struct kiocb *req, long res, long res2,
+                       struct batch_complete *batch)
+{
+       req->ki_res = res;
+       req->ki_res2 = res2;
 
-       if (waitqueue_active(&ctx->wait))
-               wake_up(&ctx->wait);
+       if (req->ki_list.next) {
+               struct kioctx *ctx = req->ki_ctx;
+               unsigned long flags;
 
-       rcu_read_unlock();
+               spin_lock_irqsave(&ctx->ctx_lock, flags);
+               list_del(&req->ki_list);
+               spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+       }
+
+       /*
+        * Special case handling for sync iocbs:
+        *  - events go directly into the iocb for fast handling
+        *  - the sync task with the iocb in its stack holds the single iocb
+        *    ref, no other paths have a way to get another ref
+        *  - the sync task helpfully left a reference to itself in the iocb
+        */
+       if (is_sync_kiocb(req)) {
+               BUG_ON(atomic_read(&req->ki_users) != 1);
+               req->ki_user_data = req->ki_res;
+               atomic_set(&req->ki_users, 0);
+               wake_up_process(req->ki_obj.tsk);
+       } else if (batch) {
+               int res;
+               struct kiocb *t;
+               struct rb_node **n = &batch->kiocb.rb_node, *parent = NULL;
+
+               while (*n) {
+                       parent = *n;
+                       t = container_of(*n, struct kiocb, ki_node);
+
+                       res = req->ki_ctx != t->ki_ctx
+                               ? req->ki_ctx < t->ki_ctx
+                               : req->ki_eventfd != t->ki_eventfd
+                               ? req->ki_eventfd < t->ki_eventfd
+                               : req < t;
+
+                       n = res ? &(*n)->rb_left : &(*n)->rb_right;
+               }
+
+               rb_link_node(&req->ki_node, parent, n);
+               rb_insert_color(&req->ki_node, &batch->kiocb);
+       } else {
+               struct batch_complete batch_stack;
+
+               memset(&req->ki_node, 0, sizeof(req->ki_node));
+               batch_stack.kiocb.rb_node = &req->ki_node;
+
+               batch_complete_aio(&batch_stack);
+       }
 }
-EXPORT_SYMBOL(aio_complete);
+EXPORT_SYMBOL(aio_complete_batch);
 
 /* aio_read_events
  *     Pull an event off of the ioctx's event ring.  Returns the number of
index e08290740cac281bd0358198db4c519eeb974e51..89cf9776f7ab35789ab096ca48292617bd26a1dd 100644 (file)
--- a/fs/bio.c
+++ b/fs/bio.c
@@ -28,6 +28,7 @@
 #include <linux/mempool.h>
 #include <linux/workqueue.h>
 #include <linux/cgroup.h>
+#include <linux/aio.h>
 #include <scsi/sg.h>           /* for struct sg_iovec */
 
 #include <trace/events/block.h>
@@ -1688,31 +1689,40 @@ void bio_flush_dcache_pages(struct bio *bi)
 EXPORT_SYMBOL(bio_flush_dcache_pages);
 #endif
 
-/**
- * bio_endio - end I/O on a bio
- * @bio:       bio
- * @error:     error, if any
- *
- * Description:
- *   bio_endio() will end I/O on the whole bio. bio_endio() is the
- *   preferred way to end I/O on a bio, it takes care of clearing
- *   BIO_UPTODATE on error. @error is 0 on success, and and one of the
- *   established -Exxxx (-EIO, for instance) error values in case
- *   something went wrong. No one should call bi_end_io() directly on a
- *   bio unless they own it and thus know that it has an end_io
- *   function.
- **/
-void bio_endio(struct bio *bio, int error)
+static inline void __bio_endio(struct bio *bio, struct batch_complete *batch)
 {
-       if (error)
+       if (bio->bi_error)
                clear_bit(BIO_UPTODATE, &bio->bi_flags);
        else if (!test_bit(BIO_UPTODATE, &bio->bi_flags))
-               error = -EIO;
+               bio->bi_error = -EIO;
 
        if (bio->bi_end_io)
-               bio->bi_end_io(bio, error, NULL);
+               bio->bi_end_io(bio, bio->bi_error, batch);
+}
+
+void bio_endio_batch(struct bio *bio, int error, struct batch_complete *batch)
+{
+       if (error)
+               bio->bi_error = error;
+
+       if (batch)
+               bio_list_add(&batch->bio, bio);
+       else
+               __bio_endio(bio, batch);
+
+}
+EXPORT_SYMBOL(bio_endio_batch);
+
+void batch_complete(struct batch_complete *batch)
+{
+       struct bio *bio;
+
+       while ((bio = bio_list_pop(&batch->bio)))
+               __bio_endio(bio, batch);
+
+       batch_complete_aio(batch);
 }
-EXPORT_SYMBOL(bio_endio);
+EXPORT_SYMBOL(batch_complete);
 
 void bio_pair_release(struct bio_pair *bp)
 {
index 331fd5cb005c04ccc31f9d0451fc7a76e4cf8ddf..b8707bf52b82f49d186bd8505b57adcf580a085a 100644 (file)
@@ -230,7 +230,8 @@ static inline struct page *dio_get_page(struct dio *dio,
  * filesystems can use it to hold additional state between get_block calls and
  * dio_complete.
  */
-static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async)
+static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async,
+                           struct batch_complete *batch)
 {
        ssize_t transferred = 0;
 
@@ -264,7 +265,7 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is
        } else {
                inode_dio_done(dio->inode);
                if (is_async)
-                       aio_complete(dio->iocb, ret, 0);
+                       aio_complete_batch(dio->iocb, ret, 0, batch);
        }
 
        return ret;
@@ -274,7 +275,7 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio);
 /*
  * Asynchronous IO callback. 
  */
-static void dio_bio_end_aio(struct bio *bio, int error)
+static void dio_bio_end_aio(struct bio *bio, int error, struct batch_complete *batch)
 {
        struct dio *dio = bio->bi_private;
        unsigned long remaining;
@@ -290,7 +291,7 @@ static void dio_bio_end_aio(struct bio *bio, int error)
        spin_unlock_irqrestore(&dio->bio_lock, flags);
 
        if (remaining == 0) {
-               dio_complete(dio, dio->iocb->ki_pos, 0, true);
+               dio_complete(dio, dio->iocb->ki_pos, 0, true, batch);
                kmem_cache_free(dio_cache, dio);
        }
 }
@@ -1265,7 +1266,7 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
                dio_await_completion(dio);
 
        if (drop_refcount(dio) == 0) {
-               retval = dio_complete(dio, offset, retval, false);
+               retval = dio_complete(dio, offset, retval, false, NULL);
                kmem_cache_free(dio_cache, dio);
        } else
                BUG_ON(retval != -EIOCBQUEUED);
index 1bdf965339f9bef4222a5bc739efc0b23807e35e..a7e4c595825e811cbc98e2aa701ddf50c417a2a8 100644 (file)
@@ -6,11 +6,12 @@
 #include <linux/aio_abi.h>
 #include <linux/uio.h>
 #include <linux/rcupdate.h>
-
 #include <linux/atomic.h>
+#include <linux/batch_complete.h>
 
 struct kioctx;
 struct kiocb;
+struct batch_complete;
 
 #define KIOCB_KEY              0
 
@@ -30,6 +31,8 @@ struct kiocb;
 typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
 
 struct kiocb {
+       struct rb_node          ki_node;
+
        atomic_t                ki_users;
 
        struct file             *ki_filp;
@@ -43,6 +46,9 @@ struct kiocb {
        } ki_obj;
 
        __u64                   ki_user_data;   /* user's data for completion */
+       long                    ki_res;
+       long                    ki_res2;
+
        loff_t                  ki_pos;
 
        void                    *private;
@@ -85,7 +91,9 @@ static inline void init_sync_kiocb(struct kiocb *kiocb, struct file *filp)
 #ifdef CONFIG_AIO
 extern ssize_t wait_on_sync_kiocb(struct kiocb *iocb);
 extern void aio_put_req(struct kiocb *iocb);
-extern void aio_complete(struct kiocb *iocb, long res, long res2);
+extern void batch_complete_aio(struct batch_complete *batch);
+extern void aio_complete_batch(struct kiocb *iocb, long res, long res2,
+                              struct batch_complete *batch);
 struct mm_struct;
 extern void exit_aio(struct mm_struct *mm);
 extern long do_io_submit(aio_context_t ctx_id, long nr,
@@ -94,7 +102,13 @@ void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel);
 #else
 static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
 static inline void aio_put_req(struct kiocb *iocb) { }
-static inline void aio_complete(struct kiocb *iocb, long res, long res2) { }
+
+static inline void batch_complete_aio(struct batch_complete *batch) { }
+static inline void aio_complete_batch(struct kiocb *iocb, long res, long res2,
+                                     struct batch_complete *batch)
+{
+       return;
+}
 struct mm_struct;
 static inline void exit_aio(struct mm_struct *mm) { }
 static inline long do_io_submit(aio_context_t ctx_id, long nr,
@@ -104,6 +118,11 @@ static inline void kiocb_set_cancel_fn(struct kiocb *req,
                                       kiocb_cancel_fn *cancel) { }
 #endif /* CONFIG_AIO */
 
+static inline void aio_complete(struct kiocb *iocb, long res, long res2)
+{
+       aio_complete_batch(iocb, res, res2, NULL);
+}
+
 static inline struct kiocb *list_kiocb(struct list_head *h)
 {
        return list_entry(h, struct kiocb, ki_list);
diff --git a/include/linux/batch_complete.h b/include/linux/batch_complete.h
new file mode 100644 (file)
index 0000000..8167a9d
--- /dev/null
@@ -0,0 +1,23 @@
+#ifndef _LINUX_BATCH_COMPLETE_H
+#define _LINUX_BATCH_COMPLETE_H
+
+#include <linux/rbtree.h>
+
+/*
+ * Common stuff to the aio and block code for batch completion. Everything
+ * important is elsewhere:
+ */
+
+struct bio;
+
+struct bio_list {
+       struct bio *head;
+       struct bio *tail;
+};
+
+struct batch_complete {
+       struct bio_list         bio;
+       struct rb_root          kiocb;
+};
+
+#endif
index 7f3089ffc87a72a38703f474414b7c0ab15e16b0..5db8a51eebb15d653d60c10fc27bee005a2fa410 100644 (file)
@@ -24,6 +24,7 @@
 #include <linux/mempool.h>
 #include <linux/ioprio.h>
 #include <linux/bug.h>
+#include <linux/batch_complete.h>
 
 #ifdef CONFIG_BLOCK
 
@@ -69,6 +70,8 @@
 #define bio_sectors(bio)       ((bio)->bi_size >> 9)
 #define bio_end_sector(bio)    ((bio)->bi_sector + bio_sectors((bio)))
 
+void bio_endio_batch(struct bio *bio, int error, struct batch_complete *batch);
+
 static inline unsigned int bio_cur_bytes(struct bio *bio)
 {
        if (bio->bi_vcnt)
@@ -252,7 +255,25 @@ static inline struct bio *bio_clone_kmalloc(struct bio *bio, gfp_t gfp_mask)
 
 }
 
-extern void bio_endio(struct bio *, int);
+/**
+ * bio_endio - end I/O on a bio
+ * @bio:       bio
+ * @error:     error, if any
+ *
+ * Description:
+ *   bio_endio() will end I/O on the whole bio. bio_endio() is the
+ *   preferred way to end I/O on a bio, it takes care of clearing
+ *   BIO_UPTODATE on error. @error is 0 on success, and and one of the
+ *   established -Exxxx (-EIO, for instance) error values in case
+ *   something went wrong. No one should call bi_end_io() directly on a
+ *   bio unless they own it and thus know that it has an end_io
+ *   function.
+ **/
+static inline void bio_endio(struct bio *bio, int error)
+{
+       bio_endio_batch(bio, error, NULL);
+}
+
 struct request_queue;
 extern int bio_phys_segments(struct request_queue *, struct bio *);
 
@@ -404,10 +425,6 @@ static inline bool bio_mergeable(struct bio *bio)
  * member of the bio.  The bio_list also caches the last list member to allow
  * fast access to the tail.
  */
-struct bio_list {
-       struct bio *head;
-       struct bio *tail;
-};
 
 static inline int bio_list_empty(const struct bio_list *bl)
 {
@@ -554,6 +571,15 @@ struct biovec_slab {
  */
 #define BIO_SPLIT_ENTRIES 2
 
+static inline void batch_complete_init(struct batch_complete *batch)
+{
+       bio_list_init(&batch->bio);
+       batch->kiocb = RB_ROOT;
+}
+
+void batch_complete(struct batch_complete *batch);
+
+
 #if defined(CONFIG_BLK_DEV_INTEGRITY)
 
 #define bip_vec_idx(bip, idx)  (&(bip->bip_vec[(idx)]))
index b3195e30880d34e7a27895e85535ab3da1ed51d6..9d3cafa6bbcd7c9b53fa3b59335dd29365c97b58 100644 (file)
@@ -43,6 +43,7 @@ struct bio {
                                                 * top bits priority
                                                 */
 
+       short                   bi_error;
        unsigned short          bi_vcnt;        /* how many bio_vec's */
        unsigned short          bi_idx;         /* current index into bvl_vec */
 
index 2fdb4a451b49bd626d9415b231c76b7ac927cf69..ddc2f8058c707a8f200e5a95c1dcc36445ea5056 100644 (file)
@@ -883,7 +883,8 @@ extern struct request *blk_fetch_request(struct request_queue *q);
  * This prevents code duplication in drivers.
  */
 extern bool blk_update_request(struct request *rq, int error,
-                              unsigned int nr_bytes);
+                              unsigned int nr_bytes,
+                              struct batch_complete *batch);
 extern bool blk_end_request(struct request *rq, int error,
                            unsigned int nr_bytes);
 extern void blk_end_request_all(struct request *rq, int error);
@@ -891,10 +892,17 @@ extern bool blk_end_request_cur(struct request *rq, int error);
 extern bool blk_end_request_err(struct request *rq, int error);
 extern bool __blk_end_request(struct request *rq, int error,
                              unsigned int nr_bytes);
-extern void __blk_end_request_all(struct request *rq, int error);
 extern bool __blk_end_request_cur(struct request *rq, int error);
 extern bool __blk_end_request_err(struct request *rq, int error);
 
+extern void blk_end_request_all_batch(struct request *rq, int error,
+                                     struct batch_complete *batch);
+
+static inline void __blk_end_request_all(struct request *rq, int error)
+{
+       blk_end_request_all_batch(rq, error, NULL);
+}
+
 extern void blk_complete_request(struct request *);
 extern void __blk_complete_request(struct request *);
 extern void blk_abort_request(struct request *);