int page_errors; /* errno from get_user_pages() */
/* BIO completion state */
- atomic_t refcount; /* direct_io_worker() and bios */
spinlock_t bio_lock; /* protects BIO fields below */
+ unsigned long refcount; /* direct_io_worker() and bios */
struct bio *bio_list; /* singly linked via bi_private */
struct task_struct *waiter; /* waiting task (NULL if none) */
{
ssize_t transferred = 0;
+ /*
+ * AIO submission can race with bio completion to get here while
+ * expecting to have the last io completed by bio completion.
+ * In that case -EIOCBQUEUED is in fact not an error we want
+ * to preserve through this call.
+ */
+ if (ret == -EIOCBQUEUED)
+ ret = 0;
+
if (dio->result) {
transferred = dio->result;
return ret;
}
-/*
- * Called when a BIO has been processed. If the count goes to zero then IO is
- * complete and we can signal this to the AIO layer.
- */
-static void dio_complete_aio(struct dio *dio)
-{
- unsigned long flags;
- int ret;
-
- ret = dio_complete(dio, dio->iocb->ki_pos, 0);
-
- /* Complete AIO later if falling back to buffered i/o */
- if (dio->result == dio->size ||
- ((dio->rw == READ) && dio->result)) {
- aio_complete(dio->iocb, ret, 0);
- kfree(dio);
- } else {
- /*
- * Falling back to buffered
- */
- spin_lock_irqsave(&dio->bio_lock, flags);
- if (dio->waiter)
- wake_up_process(dio->waiter);
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- }
-}
-
static int dio_bio_complete(struct dio *dio, struct bio *bio);
/*
* Asynchronous IO callback.
static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
{
struct dio *dio = bio->bi_private;
+ unsigned long remaining;
+ unsigned long flags;
if (bio->bi_size)
return 1;
/* cleanup the bio */
dio_bio_complete(dio, bio);
- if (atomic_dec_and_test(&dio->refcount))
- dio_complete_aio(dio);
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ remaining = --dio->refcount;
+ if (remaining == 1 && dio->waiter)
+ wake_up_process(dio->waiter);
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+
+ if (remaining == 0) {
+ int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+ aio_complete(dio->iocb, ret, 0);
+ kfree(dio);
+ }
return 0;
}
spin_lock_irqsave(&dio->bio_lock, flags);
bio->bi_private = dio->bio_list;
dio->bio_list = bio;
- if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
+ if (--dio->refcount == 1 && dio->waiter)
wake_up_process(dio->waiter);
spin_unlock_irqrestore(&dio->bio_lock, flags);
return 0;
static void dio_bio_submit(struct dio *dio)
{
struct bio *bio = dio->bio;
+ unsigned long flags;
bio->bi_private = dio;
- atomic_inc(&dio->refcount);
+
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ dio->refcount++;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+
if (dio->is_async && dio->rw == READ)
bio_set_pages_dirty(bio);
+
submit_bio(dio->rw, bio);
dio->bio = NULL;
page_cache_release(dio_get_page(dio));
}
-static int wait_for_more_bios(struct dio *dio)
-{
- assert_spin_locked(&dio->bio_lock);
-
- return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
-}
-
/*
* Wait for the next BIO to complete. Remove it and return it. NULL is
* returned once all BIOs have been completed. This must only be called once
struct bio *bio = NULL;
spin_lock_irqsave(&dio->bio_lock, flags);
- while (wait_for_more_bios(dio)) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- if (wait_for_more_bios(dio)) {
- dio->waiter = current;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- io_schedule();
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->waiter = NULL;
- }
- set_current_state(TASK_RUNNING);
+
+ /*
+ * Wait as long as the list is empty and there are bios in flight. bio
+ * completion drops the count, maybe adds to the list, and wakes while
+ * holding the bio_lock so we don't need set_current_state()'s barrier
+ * and can call it after testing our condition.
+ */
+ while (dio->refcount > 1 && dio->bio_list == NULL) {
+ __set_current_state(TASK_UNINTERRUPTIBLE);
+ dio->waiter = current;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
+ io_schedule();
+ /* wake up sets us TASK_RUNNING */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ dio->waiter = NULL;
}
if (dio->bio_list) {
bio = dio->bio_list;
* Wait on and process all in-flight BIOs. This must only be called once
* all bios have been issued so that the refcount can only decrease.
* This just waits for all bios to make it through dio_bio_complete. IO
- * errors are propogated through dio->io_error and should be propogated via
+ * errors are propagated through dio->io_error and should be propagated via
* dio_complete().
*/
static void dio_await_completion(struct dio *dio)
do_holes:
/* Handle holes */
if (!buffer_mapped(map_bh)) {
- char *kaddr;
loff_t i_size_aligned;
/* AKPM: eargh, -ENOTBLK is a hack */
page_cache_release(page);
goto out;
}
- kaddr = kmap_atomic(page, KM_USER0);
- memset(kaddr + (block_in_page << blkbits),
- 0, 1 << blkbits);
- flush_dcache_page(page);
- kunmap_atomic(kaddr, KM_USER0);
+ zero_user_page(page, block_in_page << blkbits,
+ 1 << blkbits, KM_USER0);
dio->block_in_file++;
block_in_page++;
goto next_block;
struct dio *dio)
{
unsigned long user_addr;
+ unsigned long flags;
int seg;
ssize_t ret = 0;
ssize_t ret2;
dio->iocb = iocb;
dio->i_size = i_size_read(inode);
- atomic_set(&dio->refcount, 1);
spin_lock_init(&dio->bio_lock);
+ dio->refcount = 1;
dio->bio_list = NULL;
dio->waiter = NULL;
mutex_unlock(&dio->inode->i_mutex);
/*
- * OK, all BIOs are submitted, so we can decrement bio_count to truly
- * reflect the number of to-be-processed BIOs.
+ * The only time we want to leave bios in flight is when a successful
+ * partial aio read or full aio write have been setup. In that case
+ * bio completion will call aio_complete. The only time it's safe to
+ * call aio_complete is when we return -EIOCBQUEUED, so we key on that.
+ * This had *better* be the only place that raises -EIOCBQUEUED.
*/
- if (dio->is_async) {
- int should_wait = 0;
+ BUG_ON(ret == -EIOCBQUEUED);
+ if (dio->is_async && ret == 0 && dio->result &&
+ ((rw & READ) || (dio->result == dio->size)))
+ ret = -EIOCBQUEUED;
- if (dio->result < dio->size && (rw & WRITE)) {
- dio->waiter = current;
- should_wait = 1;
- }
- if (ret == 0)
- ret = dio->result;
-
- /* this can free the dio */
- if (atomic_dec_and_test(&dio->refcount))
- dio_complete_aio(dio);
-
- if (should_wait) {
- unsigned long flags;
- /*
- * Wait for already issued I/O to drain out and
- * release its references to user-space pages
- * before returning to fallback on buffered I/O
- */
-
- spin_lock_irqsave(&dio->bio_lock, flags);
- set_current_state(TASK_UNINTERRUPTIBLE);
- while (atomic_read(&dio->refcount)) {
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- io_schedule();
- spin_lock_irqsave(&dio->bio_lock, flags);
- set_current_state(TASK_UNINTERRUPTIBLE);
- }
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- set_current_state(TASK_RUNNING);
- kfree(dio);
- }
- } else {
+ if (ret != -EIOCBQUEUED)
dio_await_completion(dio);
- ret = dio_complete(dio, offset, ret);
+ /*
+ * Sync will always be dropping the final ref and completing the
+ * operation. AIO can if it was a broken operation described above or
+ * in fact if all the bios race to complete before we get here. In
+ * that case dio_complete() translates the EIOCBQUEUED into the proper
+ * return code that the caller will hand to aio_complete().
+ *
+ * This is managed by the bio_lock instead of being an atomic_t so that
+ * completion paths can drop their ref and use the remaining count to
+ * decide to wake the submission path atomically.
+ */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ ret2 = --dio->refcount;
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
- /* We could have also come here on an AIO file extend */
- if (!is_sync_kiocb(iocb) && (rw & WRITE) &&
- ret >= 0 && dio->result == dio->size)
- /*
- * For AIO writes where we have completed the
- * i/o, we have to mark the the aio complete.
- */
- aio_complete(iocb, ret, 0);
+ if (ret2 == 0) {
+ ret = dio_complete(dio, offset, ret);
+ kfree(dio);
+ } else
+ BUG_ON(ret != -EIOCBQUEUED);
- if (atomic_dec_and_test(&dio->refcount))
- kfree(dio);
- else
- BUG();
- }
return ret;
}