]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - fs/aio.c
aio: remove retry-based AIO
[karo-tx-linux.git] / fs / aio.c
index c3ebb98a527ba35c86d4c7568624aa03e26917a6..b9cc89c22b9f19c8298a739d55203cffc795d249 100644 (file)
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -54,11 +54,6 @@ unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio request
 static struct kmem_cache       *kiocb_cachep;
 static struct kmem_cache       *kioctx_cachep;
 
-static struct workqueue_struct *aio_wq;
-
-static void aio_kick_handler(struct work_struct *);
-static void aio_queue_work(struct kioctx *);
-
 /* aio_setup
  *     Creates the slab caches used by the aio routines, panic on
  *     failure as this is done early during the boot sequence.
@@ -68,9 +63,6 @@ static int __init aio_setup(void)
        kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
        kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
 
-       aio_wq = alloc_workqueue("aio", 0, 1);  /* used to limit concurrency */
-       BUG_ON(!aio_wq);
-
        pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
 
        return 0;
@@ -86,7 +78,6 @@ static void aio_free_ring(struct kioctx *ctx)
                put_page(info->ring_pages[i]);
 
        if (info->mmap_size) {
-               BUG_ON(ctx->mm != current->mm);
                vm_munmap(info->mmap_base, info->mmap_size);
        }
 
@@ -101,6 +92,7 @@ static int aio_setup_ring(struct kioctx *ctx)
        struct aio_ring *ring;
        struct aio_ring_info *info = &ctx->ring_info;
        unsigned nr_events = ctx->max_reqs;
+       struct mm_struct *mm = current->mm;
        unsigned long size, populate;
        int nr_pages;
 
@@ -126,23 +118,22 @@ static int aio_setup_ring(struct kioctx *ctx)
 
        info->mmap_size = nr_pages * PAGE_SIZE;
        dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
-       down_write(&ctx->mm->mmap_sem);
+       down_write(&mm->mmap_sem);
        info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size, 
                                        PROT_READ|PROT_WRITE,
                                        MAP_ANONYMOUS|MAP_PRIVATE, 0,
                                        &populate);
        if (IS_ERR((void *)info->mmap_base)) {
-               up_write(&ctx->mm->mmap_sem);
+               up_write(&mm->mmap_sem);
                info->mmap_size = 0;
                aio_free_ring(ctx);
                return -EAGAIN;
        }
 
        dprintk("mmap address: 0x%08lx\n", info->mmap_base);
-       info->nr_pages = get_user_pages(current, ctx->mm,
-                                       info->mmap_base, nr_pages, 
+       info->nr_pages = get_user_pages(current, mm, info->mmap_base, nr_pages,
                                        1, 0, info->ring_pages, NULL);
-       up_write(&ctx->mm->mmap_sem);
+       up_write(&mm->mmap_sem);
 
        if (unlikely(info->nr_pages != nr_pages)) {
                aio_free_ring(ctx);
@@ -206,10 +197,7 @@ static void __put_ioctx(struct kioctx *ctx)
        unsigned nr_events = ctx->max_reqs;
        BUG_ON(ctx->reqs_active);
 
-       cancel_delayed_work_sync(&ctx->wq);
        aio_free_ring(ctx);
-       mmdrop(ctx->mm);
-       ctx->mm = NULL;
        if (nr_events) {
                spin_lock(&aio_nr_lock);
                BUG_ON(aio_nr - nr_events > aio_nr);
@@ -237,7 +225,7 @@ static inline void put_ioctx(struct kioctx *kioctx)
  */
 static struct kioctx *ioctx_alloc(unsigned nr_events)
 {
-       struct mm_struct *mm;
+       struct mm_struct *mm = current->mm;
        struct kioctx *ctx;
        int err = -ENOMEM;
 
@@ -256,8 +244,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
                return ERR_PTR(-ENOMEM);
 
        ctx->max_reqs = nr_events;
-       mm = ctx->mm = current->mm;
-       atomic_inc(&mm->mm_count);
 
        atomic_set(&ctx->users, 2);
        spin_lock_init(&ctx->ctx_lock);
@@ -265,8 +251,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
        init_waitqueue_head(&ctx->wait);
 
        INIT_LIST_HEAD(&ctx->active_reqs);
-       INIT_LIST_HEAD(&ctx->run_list);
-       INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
 
        if (aio_setup_ring(ctx) < 0)
                goto out_freectx;
@@ -287,14 +271,13 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
        spin_unlock(&mm->ioctx_lock);
 
        dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
-               ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
+               ctx, ctx->user_id, mm, ctx->ring_info.nr);
        return ctx;
 
 out_cleanup:
        err = -EAGAIN;
        aio_free_ring(ctx);
 out_freectx:
-       mmdrop(mm);
        kmem_cache_free(kioctx_cachep, ctx);
        dprintk("aio: error allocating ioctx %d\n", err);
        return ERR_PTR(err);
@@ -391,8 +374,6 @@ void exit_aio(struct mm_struct *mm)
                 * as indicator that it needs to unmap the area,
                 * just set it to 0; aio_free_ring() is the only
                 * place that uses ->mmap_size, so it's safe.
-                * That way we get all munmap done to current->mm -
-                * all other callers have ctx->mm == current->mm.
                 */
                ctx->ring_info.mmap_size = 0;
                put_ioctx(ctx);
@@ -426,7 +407,6 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
        req->ki_dtor = NULL;
        req->private = NULL;
        req->ki_iovec = NULL;
-       INIT_LIST_HEAD(&req->ki_run_list);
        req->ki_eventfd = NULL;
 
        return req;
@@ -611,281 +591,6 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
        return ret;
 }
 
-/*
- * Queue up a kiocb to be retried. Assumes that the kiocb
- * has already been marked as kicked, and places it on
- * the retry run list for the corresponding ioctx, if it
- * isn't already queued. Returns 1 if it actually queued
- * the kiocb (to tell the caller to activate the work
- * queue to process it), or 0, if it found that it was
- * already queued.
- */
-static inline int __queue_kicked_iocb(struct kiocb *iocb)
-{
-       struct kioctx *ctx = iocb->ki_ctx;
-
-       assert_spin_locked(&ctx->ctx_lock);
-
-       if (list_empty(&iocb->ki_run_list)) {
-               list_add_tail(&iocb->ki_run_list,
-                       &ctx->run_list);
-               return 1;
-       }
-       return 0;
-}
-
-/* aio_run_iocb
- *     This is the core aio execution routine. It is
- *     invoked both for initial i/o submission and
- *     subsequent retries via the aio_kick_handler.
- *     Expects to be invoked with iocb->ki_ctx->lock
- *     already held. The lock is released and reacquired
- *     as needed during processing.
- *
- * Calls the iocb retry method (already setup for the
- * iocb on initial submission) for operation specific
- * handling, but takes care of most of common retry
- * execution details for a given iocb. The retry method
- * needs to be non-blocking as far as possible, to avoid
- * holding up other iocbs waiting to be serviced by the
- * retry kernel thread.
- *
- * The trickier parts in this code have to do with
- * ensuring that only one retry instance is in progress
- * for a given iocb at any time. Providing that guarantee
- * simplifies the coding of individual aio operations as
- * it avoids various potential races.
- */
-static ssize_t aio_run_iocb(struct kiocb *iocb)
-{
-       struct kioctx   *ctx = iocb->ki_ctx;
-       ssize_t (*retry)(struct kiocb *);
-       ssize_t ret;
-
-       if (!(retry = iocb->ki_retry)) {
-               printk("aio_run_iocb: iocb->ki_retry = NULL\n");
-               return 0;
-       }
-
-       /*
-        * We don't want the next retry iteration for this
-        * operation to start until this one has returned and
-        * updated the iocb state. However, wait_queue functions
-        * can trigger a kick_iocb from interrupt context in the
-        * meantime, indicating that data is available for the next
-        * iteration. We want to remember that and enable the
-        * next retry iteration _after_ we are through with
-        * this one.
-        *
-        * So, in order to be able to register a "kick", but
-        * prevent it from being queued now, we clear the kick
-        * flag, but make the kick code *think* that the iocb is
-        * still on the run list until we are actually done.
-        * When we are done with this iteration, we check if
-        * the iocb was kicked in the meantime and if so, queue
-        * it up afresh.
-        */
-
-       kiocbClearKicked(iocb);
-
-       /*
-        * This is so that aio_complete knows it doesn't need to
-        * pull the iocb off the run list (We can't just call
-        * INIT_LIST_HEAD because we don't want a kick_iocb to
-        * queue this on the run list yet)
-        */
-       iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
-       spin_unlock_irq(&ctx->ctx_lock);
-
-       /* Quit retrying if the i/o has been cancelled */
-       if (kiocbIsCancelled(iocb)) {
-               ret = -EINTR;
-               aio_complete(iocb, ret, 0);
-               /* must not access the iocb after this */
-               goto out;
-       }
-
-       /*
-        * Now we are all set to call the retry method in async
-        * context.
-        */
-       ret = retry(iocb);
-
-       if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
-               /*
-                * There's no easy way to restart the syscall since other AIO's
-                * may be already running. Just fail this IO with EINTR.
-                */
-               if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
-                            ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
-                       ret = -EINTR;
-               aio_complete(iocb, ret, 0);
-       }
-out:
-       spin_lock_irq(&ctx->ctx_lock);
-
-       if (-EIOCBRETRY == ret) {
-               /*
-                * OK, now that we are done with this iteration
-                * and know that there is more left to go,
-                * this is where we let go so that a subsequent
-                * "kick" can start the next iteration
-                */
-
-               /* will make __queue_kicked_iocb succeed from here on */
-               INIT_LIST_HEAD(&iocb->ki_run_list);
-               /* we must queue the next iteration ourselves, if it
-                * has already been kicked */
-               if (kiocbIsKicked(iocb)) {
-                       __queue_kicked_iocb(iocb);
-
-                       /*
-                        * __queue_kicked_iocb will always return 1 here, because
-                        * iocb->ki_run_list is empty at this point so it should
-                        * be safe to unconditionally queue the context into the
-                        * work queue.
-                        */
-                       aio_queue_work(ctx);
-               }
-       }
-       return ret;
-}
-
-/*
- * __aio_run_iocbs:
- *     Process all pending retries queued on the ioctx
- *     run list.
- * Assumes it is operating within the aio issuer's mm
- * context.
- */
-static int __aio_run_iocbs(struct kioctx *ctx)
-{
-       struct kiocb *iocb;
-       struct list_head run_list;
-
-       assert_spin_locked(&ctx->ctx_lock);
-
-       list_replace_init(&ctx->run_list, &run_list);
-       while (!list_empty(&run_list)) {
-               iocb = list_entry(run_list.next, struct kiocb,
-                       ki_run_list);
-               list_del(&iocb->ki_run_list);
-               /*
-                * Hold an extra reference while retrying i/o.
-                */
-               iocb->ki_users++;       /* grab extra reference */
-               aio_run_iocb(iocb);
-               __aio_put_req(ctx, iocb);
-       }
-       if (!list_empty(&ctx->run_list))
-               return 1;
-       return 0;
-}
-
-static void aio_queue_work(struct kioctx * ctx)
-{
-       unsigned long timeout;
-       /*
-        * if someone is waiting, get the work started right
-        * away, otherwise, use a longer delay
-        */
-       smp_mb();
-       if (waitqueue_active(&ctx->wait))
-               timeout = 1;
-       else
-               timeout = HZ/10;
-       queue_delayed_work(aio_wq, &ctx->wq, timeout);
-}
-
-/*
- * aio_run_all_iocbs:
- *     Process all pending retries queued on the ioctx
- *     run list, and keep running them until the list
- *     stays empty.
- * Assumes it is operating within the aio issuer's mm context.
- */
-static inline void aio_run_all_iocbs(struct kioctx *ctx)
-{
-       spin_lock_irq(&ctx->ctx_lock);
-       while (__aio_run_iocbs(ctx))
-               ;
-       spin_unlock_irq(&ctx->ctx_lock);
-}
-
-/*
- * aio_kick_handler:
- *     Work queue handler triggered to process pending
- *     retries on an ioctx. Takes on the aio issuer's
- *     mm context before running the iocbs, so that
- *     copy_xxx_user operates on the issuer's address
- *      space.
- * Run on aiod's context.
- */
-static void aio_kick_handler(struct work_struct *work)
-{
-       struct kioctx *ctx = container_of(work, struct kioctx, wq.work);
-       mm_segment_t oldfs = get_fs();
-       struct mm_struct *mm;
-       int requeue;
-
-       set_fs(USER_DS);
-       use_mm(ctx->mm);
-       spin_lock_irq(&ctx->ctx_lock);
-       requeue =__aio_run_iocbs(ctx);
-       mm = ctx->mm;
-       spin_unlock_irq(&ctx->ctx_lock);
-       unuse_mm(mm);
-       set_fs(oldfs);
-       /*
-        * we're in a worker thread already; no point using non-zero delay
-        */
-       if (requeue)
-               queue_delayed_work(aio_wq, &ctx->wq, 0);
-}
-
-
-/*
- * Called by kick_iocb to queue the kiocb for retry
- * and if required activate the aio work queue to process
- * it
- */
-static void try_queue_kicked_iocb(struct kiocb *iocb)
-{
-       struct kioctx   *ctx = iocb->ki_ctx;
-       unsigned long flags;
-       int run = 0;
-
-       spin_lock_irqsave(&ctx->ctx_lock, flags);
-       /* set this inside the lock so that we can't race with aio_run_iocb()
-        * testing it and putting the iocb on the run list under the lock */
-       if (!kiocbTryKick(iocb))
-               run = __queue_kicked_iocb(iocb);
-       spin_unlock_irqrestore(&ctx->ctx_lock, flags);
-       if (run)
-               aio_queue_work(ctx);
-}
-
-/*
- * kick_iocb:
- *      Called typically from a wait queue callback context
- *      to trigger a retry of the iocb.
- *      The retry is usually executed by aio workqueue
- *      threads (See aio_kick_handler).
- */
-void kick_iocb(struct kiocb *iocb)
-{
-       /* sync iocbs are easy: they can only ever be executing from a 
-        * single context. */
-       if (is_sync_kiocb(iocb)) {
-               kiocbSetKicked(iocb);
-               wake_up_process(iocb->ki_obj.tsk);
-               return;
-       }
-
-       try_queue_kicked_iocb(iocb);
-}
-EXPORT_SYMBOL(kick_iocb);
-
 /* aio_complete
  *     Called when the io request on the given iocb is complete.
  *     Returns true if this is the last user of the request.  The 
@@ -926,9 +631,6 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
         */
        spin_lock_irqsave(&ctx->ctx_lock, flags);
 
-       if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
-               list_del_init(&iocb->ki_run_list);
-
        /*
         * cancelled requests don't get events, userland was given one
         * when the event got cancelled.
@@ -1083,13 +785,11 @@ static int read_events(struct kioctx *ctx,
        int                     i = 0;
        struct io_event         ent;
        struct aio_timeout      to;
-       int                     retry = 0;
 
        /* needed to zero any padding within an entry (there shouldn't be 
         * any, but C is fun!
         */
        memset(&ent, 0, sizeof(ent));
-retry:
        ret = 0;
        while (likely(i < nr)) {
                ret = aio_read_evt(ctx, &ent);
@@ -1119,13 +819,6 @@ retry:
 
        /* End fast path */
 
-       /* racey check, but it gets redone */
-       if (!retry && unlikely(!list_empty(&ctx->run_list))) {
-               retry = 1;
-               aio_run_all_iocbs(ctx);
-               goto retry;
-       }
-
        init_timeout(&to);
        if (timeout) {
                struct timespec ts;
@@ -1345,7 +1038,7 @@ static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
        /* If we managed to write some out we return that, rather than
         * the eventual error. */
        if (opcode == IOCB_CMD_PWRITEV
-           && ret < 0 && ret != -EIOCBQUEUED && ret != -EIOCBRETRY
+           && ret < 0 && ret != -EIOCBQUEUED
            && iocb->ki_nbytes - iocb->ki_left)
                ret = iocb->ki_nbytes - iocb->ki_left;
 
@@ -1587,18 +1280,27 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
         * don't see ctx->dead set here, io_destroy() waits for our IO to
         * finish.
         */
-       if (ctx->dead) {
-               spin_unlock_irq(&ctx->ctx_lock);
+       if (ctx->dead)
                ret = -EINVAL;
+       spin_unlock_irq(&ctx->ctx_lock);
+       if (ret)
                goto out_put_req;
+
+       if (unlikely(kiocbIsCancelled(req))) {
+               ret = -EINTR;
+       } else {
+               ret = req->ki_retry(req);
        }
-       aio_run_iocb(req);
-       if (!list_empty(&ctx->run_list)) {
-               /* drain the run list */
-               while (__aio_run_iocbs(ctx))
-                       ;
+       if (ret != -EIOCBQUEUED) {
+               /*
+                * There's no easy way to restart the syscall since other AIO's
+                * may be already running. Just fail this IO with EINTR.
+                */
+               if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
+                            ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
+                       ret = -EINTR;
+               aio_complete(req, ret, 0);
        }
-       spin_unlock_irq(&ctx->ctx_lock);
 
        aio_put_req(req);       /* drop extra ref to req */
        return 0;