]> git.karo-electronics.de Git - karo-tx-linux.git/blob - fs/aio.c
block-aio-batch-completion-for-bios-kiocbs-fix-fix
[karo-tx-linux.git] / fs / aio.c
1 /*
2  *      An async IO implementation for Linux
3  *      Written by Benjamin LaHaise <bcrl@kvack.org>
4  *
5  *      Implements an efficient asynchronous io interface.
6  *
7  *      Copyright 2000, 2001, 2002 Red Hat, Inc.  All Rights Reserved.
8  *
9  *      See ../COPYING for licensing terms.
10  */
11 #define pr_fmt(fmt) "%s: " fmt, __func__
12
13 #include <linux/kernel.h>
14 #include <linux/init.h>
15 #include <linux/errno.h>
16 #include <linux/time.h>
17 #include <linux/aio_abi.h>
18 #include <linux/export.h>
19 #include <linux/syscalls.h>
20 #include <linux/backing-dev.h>
21 #include <linux/uio.h>
22
23 #include <linux/sched.h>
24 #include <linux/fs.h>
25 #include <linux/file.h>
26 #include <linux/mm.h>
27 #include <linux/mman.h>
28 #include <linux/bio.h>
29 #include <linux/mmu_context.h>
30 #include <linux/percpu.h>
31 #include <linux/slab.h>
32 #include <linux/timer.h>
33 #include <linux/aio.h>
34 #include <linux/highmem.h>
35 #include <linux/workqueue.h>
36 #include <linux/security.h>
37 #include <linux/eventfd.h>
38 #include <linux/blkdev.h>
39 #include <linux/compat.h>
40 #include <linux/percpu-refcount.h>
41
42 #include <asm/kmap_types.h>
43 #include <asm/uaccess.h>
44
45 #define AIO_RING_MAGIC                  0xa10a10a1
46 #define AIO_RING_COMPAT_FEATURES        1
47 #define AIO_RING_INCOMPAT_FEATURES      0
48 struct aio_ring {
49         unsigned        id;     /* kernel internal index number */
50         unsigned        nr;     /* number of io_events */
51         unsigned        head;
52         unsigned        tail;
53
54         unsigned        magic;
55         unsigned        compat_features;
56         unsigned        incompat_features;
57         unsigned        header_length;  /* size of aio_ring */
58
59
60         struct io_event         io_events[0];
61 }; /* 128 bytes + ring size */
62
63 #define AIO_RING_PAGES  8
64
65 struct kioctx_cpu {
66         unsigned                reqs_available;
67 };
68
69 struct kioctx {
70         struct percpu_ref       users;
71
72         /* This needs improving */
73         unsigned long           user_id;
74         struct hlist_node       list;
75
76         struct __percpu kioctx_cpu *cpu;
77
78         unsigned                req_batch;
79
80         unsigned                nr;
81
82         /* sys_io_setup currently limits this to an unsigned int */
83         unsigned                max_reqs;
84
85         unsigned long           mmap_base;
86         unsigned long           mmap_size;
87
88         struct page             **ring_pages;
89         long                    nr_pages;
90
91         struct rcu_head         rcu_head;
92         struct work_struct      rcu_work;
93
94         struct {
95                 atomic_t        reqs_available;
96         } ____cacheline_aligned_in_smp;
97
98         struct {
99                 spinlock_t      ctx_lock;
100                 struct list_head active_reqs;   /* used for cancellation */
101         } ____cacheline_aligned_in_smp;
102
103         struct {
104                 struct mutex    ring_lock;
105                 wait_queue_head_t wait;
106                 unsigned        shadow_tail;
107         } ____cacheline_aligned_in_smp;
108
109         struct {
110                 unsigned        tail;
111         } ____cacheline_aligned_in_smp;
112
113         struct page             *internal_pages[AIO_RING_PAGES];
114 };
115
116 /*------ sysctl variables----*/
117 static DEFINE_SPINLOCK(aio_nr_lock);
118 unsigned long aio_nr;           /* current system wide number of aio requests */
119 unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
120 /*----end sysctl variables---*/
121
122 static struct kmem_cache        *kiocb_cachep;
123 static struct kmem_cache        *kioctx_cachep;
124
125 /* aio_setup
126  *      Creates the slab caches used by the aio routines, panic on
127  *      failure as this is done early during the boot sequence.
128  */
129 static int __init aio_setup(void)
130 {
131         kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
132         kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
133
134         pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page));
135
136         return 0;
137 }
138 __initcall(aio_setup);
139
140 static void aio_free_ring(struct kioctx *ctx)
141 {
142         long i;
143
144         for (i = 0; i < ctx->nr_pages; i++)
145                 put_page(ctx->ring_pages[i]);
146
147         if (ctx->mmap_size)
148                 vm_munmap(ctx->mmap_base, ctx->mmap_size);
149
150         if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages)
151                 kfree(ctx->ring_pages);
152 }
153
154 static int aio_setup_ring(struct kioctx *ctx)
155 {
156         struct aio_ring *ring;
157         unsigned nr_events = ctx->max_reqs;
158         struct mm_struct *mm = current->mm;
159         unsigned long size, populate;
160         int nr_pages;
161
162         nr_events = max(nr_events, num_possible_cpus() * 4);
163         nr_events *= 2;
164
165         /* Compensate for the ring buffer's head/tail overlap entry */
166         nr_events += 2; /* 1 is required, 2 for good luck */
167
168         size = sizeof(struct aio_ring);
169         size += sizeof(struct io_event) * nr_events;
170         nr_pages = (size + PAGE_SIZE-1) >> PAGE_SHIFT;
171
172         if (nr_pages < 0)
173                 return -EINVAL;
174
175         nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
176
177         ctx->nr = 0;
178         ctx->ring_pages = ctx->internal_pages;
179         if (nr_pages > AIO_RING_PAGES) {
180                 ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *),
181                                           GFP_KERNEL);
182                 if (!ctx->ring_pages)
183                         return -ENOMEM;
184         }
185
186         ctx->mmap_size = nr_pages * PAGE_SIZE;
187         pr_debug("attempting mmap of %lu bytes\n", ctx->mmap_size);
188         down_write(&mm->mmap_sem);
189         ctx->mmap_base = do_mmap_pgoff(NULL, 0, ctx->mmap_size,
190                                        PROT_READ|PROT_WRITE,
191                                        MAP_ANONYMOUS|MAP_PRIVATE, 0, &populate);
192         if (IS_ERR((void *)ctx->mmap_base)) {
193                 up_write(&mm->mmap_sem);
194                 ctx->mmap_size = 0;
195                 aio_free_ring(ctx);
196                 return -EAGAIN;
197         }
198
199         pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base);
200         ctx->nr_pages = get_user_pages(current, mm, ctx->mmap_base, nr_pages,
201                                        1, 0, ctx->ring_pages, NULL);
202         up_write(&mm->mmap_sem);
203
204         if (unlikely(ctx->nr_pages != nr_pages)) {
205                 aio_free_ring(ctx);
206                 return -EAGAIN;
207         }
208         if (populate)
209                 mm_populate(ctx->mmap_base, populate);
210
211         ctx->user_id = ctx->mmap_base;
212         ctx->nr = nr_events;            /* trusted copy */
213
214         ring = kmap_atomic(ctx->ring_pages[0]);
215         ring->nr = nr_events;   /* user copy */
216         ring->id = ctx->user_id;
217         ring->head = ring->tail = 0;
218         ring->magic = AIO_RING_MAGIC;
219         ring->compat_features = AIO_RING_COMPAT_FEATURES;
220         ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
221         ring->header_length = sizeof(struct aio_ring);
222         kunmap_atomic(ring);
223         flush_dcache_page(ctx->ring_pages[0]);
224
225         return 0;
226 }
227
228 #define AIO_EVENTS_PER_PAGE     (PAGE_SIZE / sizeof(struct io_event))
229 #define AIO_EVENTS_FIRST_PAGE   ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
230 #define AIO_EVENTS_OFFSET       (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
231
232 void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
233 {
234         struct kioctx *ctx = req->ki_ctx;
235         unsigned long flags;
236
237         spin_lock_irqsave(&ctx->ctx_lock, flags);
238
239         if (!req->ki_list.next)
240                 list_add(&req->ki_list, &ctx->active_reqs);
241
242         req->ki_cancel = cancel;
243
244         spin_unlock_irqrestore(&ctx->ctx_lock, flags);
245 }
246 EXPORT_SYMBOL(kiocb_set_cancel_fn);
247
248 static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
249                         struct io_event *res)
250 {
251         kiocb_cancel_fn *old, *cancel;
252         int ret = -EINVAL;
253
254         /*
255          * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it
256          * actually has a cancel function, hence the cmpxchg()
257          */
258
259         cancel = ACCESS_ONCE(kiocb->ki_cancel);
260         do {
261                 if (!cancel || cancel == KIOCB_CANCELLED)
262                         return ret;
263
264                 old = cancel;
265                 cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);
266         } while (cancel != old);
267
268         atomic_inc(&kiocb->ki_users);
269         spin_unlock_irq(&ctx->ctx_lock);
270
271         memset(res, 0, sizeof(*res));
272         res->obj = (u64)(unsigned long)kiocb->ki_obj.user;
273         res->data = kiocb->ki_user_data;
274         ret = cancel(kiocb, res);
275
276         spin_lock_irq(&ctx->ctx_lock);
277
278         return ret;
279 }
280
281 static void free_ioctx_rcu(struct rcu_head *head)
282 {
283         struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
284
285         free_percpu(ctx->cpu);
286         kmem_cache_free(kioctx_cachep, ctx);
287 }
288
289 /*
290  * When this function runs, the kioctx has been removed from the "hash table"
291  * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
292  * now it's safe to cancel any that need to be.
293  */
294 static void free_ioctx(struct kioctx *ctx)
295 {
296         struct aio_ring *ring;
297         struct io_event res;
298         struct kiocb *req;
299         unsigned cpu, head, avail;
300
301         spin_lock_irq(&ctx->ctx_lock);
302
303         while (!list_empty(&ctx->active_reqs)) {
304                 req = list_first_entry(&ctx->active_reqs,
305                                        struct kiocb, ki_list);
306
307                 list_del_init(&req->ki_list);
308                 kiocb_cancel(ctx, req, &res);
309         }
310
311         spin_unlock_irq(&ctx->ctx_lock);
312
313         for_each_possible_cpu(cpu) {
314                 struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu);
315
316                 atomic_add(kcpu->reqs_available, &ctx->reqs_available);
317                 kcpu->reqs_available = 0;
318         }
319
320         ring = kmap_atomic(ctx->ring_pages[0]);
321         head = ring->head;
322         kunmap_atomic(ring);
323
324         while (atomic_read(&ctx->reqs_available) < ctx->nr) {
325                 wait_event(ctx->wait, head != ctx->shadow_tail);
326
327                 avail = (head < ctx->shadow_tail ? ctx->shadow_tail : ctx->nr) - head;
328
329                 atomic_add(avail, &ctx->reqs_available);
330                 head += avail;
331                 head %= ctx->nr;
332         }
333
334         WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr);
335
336         aio_free_ring(ctx);
337
338         spin_lock(&aio_nr_lock);
339         BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
340         aio_nr -= ctx->max_reqs;
341         spin_unlock(&aio_nr_lock);
342
343         pr_debug("freeing %p\n", ctx);
344
345         /*
346          * Here the call_rcu() is between the wait_event() for reqs_active to
347          * hit 0, and freeing the ioctx.
348          *
349          * aio_complete() decrements reqs_active, but it has to touch the ioctx
350          * after to issue a wakeup so we use rcu.
351          */
352         call_rcu(&ctx->rcu_head, free_ioctx_rcu);
353 }
354
355 static void put_ioctx(struct kioctx *ctx)
356 {
357         if (percpu_ref_put(&ctx->users))
358                 free_ioctx(ctx);
359 }
360
361 /* ioctx_alloc
362  *      Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
363  */
364 static struct kioctx *ioctx_alloc(unsigned nr_events)
365 {
366         struct mm_struct *mm = current->mm;
367         struct kioctx *ctx;
368         int err = -ENOMEM;
369
370         /* Prevent overflows */
371         if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
372             (nr_events > (0x10000000U / sizeof(struct kiocb)))) {
373                 pr_debug("ENOMEM: nr_events too high\n");
374                 return ERR_PTR(-EINVAL);
375         }
376
377         if (!nr_events || (unsigned long)nr_events > aio_max_nr)
378                 return ERR_PTR(-EAGAIN);
379
380         ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL);
381         if (!ctx)
382                 return ERR_PTR(-ENOMEM);
383
384         ctx->max_reqs = nr_events;
385
386         percpu_ref_init(&ctx->users);
387         rcu_read_lock();
388         percpu_ref_get(&ctx->users);
389         rcu_read_unlock();
390
391         spin_lock_init(&ctx->ctx_lock);
392         mutex_init(&ctx->ring_lock);
393         init_waitqueue_head(&ctx->wait);
394
395         INIT_LIST_HEAD(&ctx->active_reqs);
396
397         ctx->cpu = alloc_percpu(struct kioctx_cpu);
398         if (!ctx->cpu)
399                 goto out_freectx;
400
401         if (aio_setup_ring(ctx) < 0)
402                 goto out_freepcpu;
403
404         atomic_set(&ctx->reqs_available, ctx->nr);
405         ctx->req_batch = ctx->nr / (num_possible_cpus() * 4);
406         BUG_ON(!ctx->req_batch);
407
408         /* limit the number of system wide aios */
409         spin_lock(&aio_nr_lock);
410         if (aio_nr + nr_events > aio_max_nr ||
411             aio_nr + nr_events < aio_nr) {
412                 spin_unlock(&aio_nr_lock);
413                 goto out_cleanup;
414         }
415         aio_nr += ctx->max_reqs;
416         spin_unlock(&aio_nr_lock);
417
418         /* now link into global list. */
419         spin_lock(&mm->ioctx_lock);
420         hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
421         spin_unlock(&mm->ioctx_lock);
422
423         pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
424                  ctx, ctx->user_id, mm, ctx->nr);
425         return ctx;
426
427 out_cleanup:
428         err = -EAGAIN;
429         aio_free_ring(ctx);
430 out_freepcpu:
431         free_percpu(ctx->cpu);
432 out_freectx:
433         kmem_cache_free(kioctx_cachep, ctx);
434         pr_debug("error allocating ioctx %d\n", err);
435         return ERR_PTR(err);
436 }
437
438 static void kill_ioctx_work(struct work_struct *work)
439 {
440         struct kioctx *ctx = container_of(work, struct kioctx, rcu_work);
441
442         wake_up_all(&ctx->wait);
443         put_ioctx(ctx);
444 }
445
446 static void kill_ioctx_rcu(struct rcu_head *head)
447 {
448         struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
449
450         INIT_WORK(&ctx->rcu_work, kill_ioctx_work);
451         schedule_work(&ctx->rcu_work);
452 }
453
454 /* kill_ioctx
455  *      Cancels all outstanding aio requests on an aio context.  Used
456  *      when the processes owning a context have all exited to encourage
457  *      the rapid destruction of the kioctx.
458  */
459 static void kill_ioctx(struct kioctx *ctx)
460 {
461         if (percpu_ref_kill(&ctx->users)) {
462                 hlist_del_rcu(&ctx->list);
463                 /* Between hlist_del_rcu() and dropping the initial ref */
464                 synchronize_rcu();
465
466                 /*
467                  * We can't punt to workqueue here because put_ioctx() ->
468                  * free_ioctx() will unmap the ringbuffer, and that has to be
469                  * done in the original process's context. kill_ioctx_rcu/work()
470                  * exist for exit_aio(), as in that path free_ioctx() won't do
471                  * the unmap.
472                  */
473                 kill_ioctx_work(&ctx->rcu_work);
474         }
475 }
476
477 /* wait_on_sync_kiocb:
478  *      Waits on the given sync kiocb to complete.
479  */
480 ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
481 {
482         while (atomic_read(&iocb->ki_users)) {
483                 set_current_state(TASK_UNINTERRUPTIBLE);
484                 if (!atomic_read(&iocb->ki_users))
485                         break;
486                 io_schedule();
487         }
488         __set_current_state(TASK_RUNNING);
489         return iocb->ki_user_data;
490 }
491 EXPORT_SYMBOL(wait_on_sync_kiocb);
492
493 /*
494  * exit_aio: called when the last user of mm goes away.  At this point, there is
495  * no way for any new requests to be submited or any of the io_* syscalls to be
496  * called on the context.
497  *
498  * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on
499  * them.
500  */
501 void exit_aio(struct mm_struct *mm)
502 {
503         struct kioctx *ctx;
504         struct hlist_node *n;
505
506         hlist_for_each_entry_safe(ctx, n, &mm->ioctx_list, list) {
507                 /*
508                  * We don't need to bother with munmap() here -
509                  * exit_mmap(mm) is coming and it'll unmap everything.
510                  * Since aio_free_ring() uses non-zero ->mmap_size
511                  * as indicator that it needs to unmap the area,
512                  * just set it to 0; aio_free_ring() is the only
513                  * place that uses ->mmap_size, so it's safe.
514                  */
515                 ctx->mmap_size = 0;
516
517                 if (percpu_ref_kill(&ctx->users)) {
518                         hlist_del_rcu(&ctx->list);
519                         call_rcu(&ctx->rcu_head, kill_ioctx_rcu);
520                 }
521         }
522 }
523
524 static void put_reqs_available(struct kioctx *ctx, unsigned nr)
525 {
526         struct kioctx_cpu *kcpu;
527
528         preempt_disable();
529         kcpu = this_cpu_ptr(ctx->cpu);
530
531         kcpu->reqs_available += nr;
532         while (kcpu->reqs_available >= ctx->req_batch * 2) {
533                 kcpu->reqs_available -= ctx->req_batch;
534                 atomic_add(ctx->req_batch, &ctx->reqs_available);
535         }
536
537         preempt_enable();
538 }
539
540 static bool get_reqs_available(struct kioctx *ctx)
541 {
542         struct kioctx_cpu *kcpu;
543         bool ret = false;
544
545         preempt_disable();
546         kcpu = this_cpu_ptr(ctx->cpu);
547
548         if (!kcpu->reqs_available) {
549                 int old, avail = atomic_read(&ctx->reqs_available);
550
551                 do {
552                         if (avail < ctx->req_batch)
553                                 goto out;
554
555                         old = avail;
556                         avail = atomic_cmpxchg(&ctx->reqs_available,
557                                                avail, avail - ctx->req_batch);
558                 } while (avail != old);
559
560                 kcpu->reqs_available += ctx->req_batch;
561         }
562
563         ret = true;
564         kcpu->reqs_available--;
565 out:
566         preempt_enable();
567         return ret;
568 }
569
570 /* aio_get_req
571  *      Allocate a slot for an aio request.  Increments the ki_users count
572  * of the kioctx so that the kioctx stays around until all requests are
573  * complete.  Returns NULL if no requests are free.
574  *
575  * Returns with kiocb->ki_users set to 2.  The io submit code path holds
576  * an extra reference while submitting the i/o.
577  * This prevents races between the aio code path referencing the
578  * req (after submitting it) and aio_complete() freeing the req.
579  */
580 static inline struct kiocb *aio_get_req(struct kioctx *ctx)
581 {
582         struct kiocb *req;
583
584         if (!get_reqs_available(ctx))
585                 return NULL;
586
587         req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
588         if (unlikely(!req))
589                 goto out_put;
590
591         atomic_set(&req->ki_users, 2);
592         req->ki_ctx = ctx;
593         return req;
594 out_put:
595         put_reqs_available(ctx, 1);
596         return NULL;
597 }
598
599 static void kiocb_free(struct kiocb *req)
600 {
601         if (req->ki_filp)
602                 fput(req->ki_filp);
603         if (req->ki_eventfd != NULL)
604                 eventfd_ctx_put(req->ki_eventfd);
605         if (req->ki_dtor)
606                 req->ki_dtor(req);
607         if (req->ki_iovec != &req->ki_inline_vec)
608                 kfree(req->ki_iovec);
609         kmem_cache_free(kiocb_cachep, req);
610 }
611
612 void aio_put_req(struct kiocb *req)
613 {
614         if (atomic_dec_and_test(&req->ki_users))
615                 kiocb_free(req);
616 }
617 EXPORT_SYMBOL(aio_put_req);
618
619 static struct kioctx *lookup_ioctx(unsigned long ctx_id)
620 {
621         struct mm_struct *mm = current->mm;
622         struct kioctx *ctx, *ret = NULL;
623
624         rcu_read_lock();
625
626         hlist_for_each_entry_rcu(ctx, &mm->ioctx_list, list) {
627                 if (ctx->user_id == ctx_id){
628                         percpu_ref_get(&ctx->users);
629                         ret = ctx;
630                         break;
631                 }
632         }
633
634         rcu_read_unlock();
635         return ret;
636 }
637
638 static inline unsigned kioctx_ring_put(struct kioctx *ctx, struct kiocb *req,
639                                        unsigned tail)
640 {
641         struct io_event *ev_page, *event;
642         unsigned pos = tail + AIO_EVENTS_OFFSET;
643
644         if (++tail >= ctx->nr)
645                 tail = 0;
646
647         ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
648         event = ev_page + pos % AIO_EVENTS_PER_PAGE;
649
650         event->obj      = (u64)(unsigned long)req->ki_obj.user;
651         event->data     = req->ki_user_data;
652         event->res      = req->ki_res;
653         event->res2     = req->ki_res2;
654
655         kunmap_atomic(ev_page);
656         flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
657
658         pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n",
659                  ctx, tail, req, req->ki_obj.user, req->ki_user_data,
660                  req->ki_res, req->ki_res2);
661
662         return tail;
663 }
664
665 static inline unsigned kioctx_ring_lock(struct kioctx *ctx)
666 {
667         unsigned tail;
668
669         /*
670          * ctx->tail is both our lock and the canonical version of the tail
671          * pointer.
672          */
673         while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
674                 cpu_relax();
675
676         return tail;
677 }
678
679 static inline void kioctx_ring_unlock(struct kioctx *ctx, unsigned tail)
680 {
681         struct aio_ring *ring;
682
683         smp_wmb();
684         /* make event visible before updating tail */
685
686         ctx->shadow_tail = tail;
687
688         ring = kmap_atomic(ctx->ring_pages[0]);
689         ring->tail = tail;
690         kunmap_atomic(ring);
691         flush_dcache_page(ctx->ring_pages[0]);
692
693         /* unlock, make new tail visible before checking waitlist */
694         smp_mb();
695
696         ctx->tail = tail;
697
698         if (waitqueue_active(&ctx->wait))
699                 wake_up(&ctx->wait);
700 }
701
702 void batch_complete_aio(struct batch_complete *batch)
703 {
704         struct kioctx *ctx = NULL;
705         struct eventfd_ctx *eventfd = NULL;
706         struct rb_node *n;
707         unsigned long flags;
708         unsigned tail = 0;
709
710         if (RB_EMPTY_ROOT(&batch->kiocb))
711                 return;
712
713         /*
714          * Take rcu_read_lock() in case the kioctx is being destroyed, as we
715          * need to issue a wakeup after incrementing reqs_available.
716          */
717         rcu_read_lock();
718         local_irq_save(flags);
719
720         n = rb_first(&batch->kiocb);
721         while (n) {
722                 struct kiocb *req = container_of(n, struct kiocb, ki_node);
723
724                 if (n->rb_right) {
725                         n->rb_right->__rb_parent_color = n->__rb_parent_color;
726                         n = n->rb_right;
727
728                         while (n->rb_left)
729                                 n = n->rb_left;
730                 } else {
731                         n = rb_parent(n);
732                 }
733
734                 if (unlikely(xchg(&req->ki_cancel,
735                                   KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
736                         /*
737                          * Can't use the percpu reqs_available here - could race
738                          * with free_ioctx()
739                          */
740                         atomic_inc(&req->ki_ctx->reqs_available);
741                         aio_put_req(req);
742                         continue;
743                 }
744
745                 if (unlikely(req->ki_eventfd != eventfd)) {
746                         if (eventfd) {
747                                 /* Make event visible */
748                                 kioctx_ring_unlock(ctx, tail);
749                                 ctx = NULL;
750
751                                 eventfd_signal(eventfd, 1);
752                                 eventfd_ctx_put(eventfd);
753                         }
754
755                         eventfd = req->ki_eventfd;
756                         req->ki_eventfd = NULL;
757                 }
758
759                 if (unlikely(req->ki_ctx != ctx)) {
760                         if (ctx)
761                                 kioctx_ring_unlock(ctx, tail);
762
763                         ctx = req->ki_ctx;
764                         tail = kioctx_ring_lock(ctx);
765                 }
766
767                 tail = kioctx_ring_put(ctx, req, tail);
768                 aio_put_req(req);
769         }
770
771         kioctx_ring_unlock(ctx, tail);
772         local_irq_restore(flags);
773         rcu_read_unlock();
774
775         /*
776          * Check if the user asked us to deliver the result through an
777          * eventfd. The eventfd_signal() function is safe to be called
778          * from IRQ context.
779          */
780         if (eventfd) {
781                 eventfd_signal(eventfd, 1);
782                 eventfd_ctx_put(eventfd);
783         }
784 }
785 EXPORT_SYMBOL(batch_complete_aio);
786
787 /* aio_complete_batch
788  *      Called when the io request on the given iocb is complete; @batch may be
789  *      NULL.
790  */
791 void aio_complete_batch(struct kiocb *req, long res, long res2,
792                         struct batch_complete *batch)
793 {
794         req->ki_res = res;
795         req->ki_res2 = res2;
796
797         if (req->ki_list.next) {
798                 struct kioctx *ctx = req->ki_ctx;
799                 unsigned long flags;
800
801                 spin_lock_irqsave(&ctx->ctx_lock, flags);
802                 list_del(&req->ki_list);
803                 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
804         }
805
806         /*
807          * Special case handling for sync iocbs:
808          *  - events go directly into the iocb for fast handling
809          *  - the sync task with the iocb in its stack holds the single iocb
810          *    ref, no other paths have a way to get another ref
811          *  - the sync task helpfully left a reference to itself in the iocb
812          */
813         if (is_sync_kiocb(req)) {
814                 BUG_ON(atomic_read(&req->ki_users) != 1);
815                 req->ki_user_data = req->ki_res;
816                 atomic_set(&req->ki_users, 0);
817                 wake_up_process(req->ki_obj.tsk);
818         } else if (batch) {
819                 int res;
820                 struct kiocb *t;
821                 struct rb_node **n = &batch->kiocb.rb_node, *parent = NULL;
822
823                 while (*n) {
824                         parent = *n;
825                         t = container_of(*n, struct kiocb, ki_node);
826
827                         res = req->ki_ctx != t->ki_ctx
828                                 ? req->ki_ctx < t->ki_ctx
829                                 : req->ki_eventfd != t->ki_eventfd
830                                 ? req->ki_eventfd < t->ki_eventfd
831                                 : req < t;
832
833                         n = res ? &(*n)->rb_left : &(*n)->rb_right;
834                 }
835
836                 rb_link_node(&req->ki_node, parent, n);
837                 rb_insert_color(&req->ki_node, &batch->kiocb);
838         } else {
839                 struct batch_complete batch_stack;
840
841                 memset(&req->ki_node, 0, sizeof(req->ki_node));
842                 batch_stack.kiocb.rb_node = &req->ki_node;
843
844                 batch_complete_aio(&batch_stack);
845         }
846 }
847 EXPORT_SYMBOL(aio_complete_batch);
848
849 /* aio_read_events
850  *      Pull an event off of the ioctx's event ring.  Returns the number of
851  *      events fetched
852  */
853 static int aio_read_events_ring(struct kioctx *ctx,
854                                 struct io_event __user *event, long nr)
855 {
856         struct aio_ring *ring;
857         unsigned head, pos;
858         int ret = 0, copy_ret;
859
860         if (!mutex_trylock(&ctx->ring_lock)) {
861                 __set_current_state(TASK_RUNNING);
862                 mutex_lock(&ctx->ring_lock);
863         }
864
865         ring = kmap_atomic(ctx->ring_pages[0]);
866         head = ring->head;
867         kunmap_atomic(ring);
868
869         pr_debug("h%u t%u m%u\n", head, ctx->shadow_tail, ctx->nr);
870
871         if (head == ctx->shadow_tail)
872                 goto out;
873
874         __set_current_state(TASK_RUNNING);
875
876         while (ret < nr) {
877                 unsigned i = (head < ctx->shadow_tail ? ctx->shadow_tail : ctx->nr) - head;
878                 struct io_event *ev;
879                 struct page *page;
880
881                 if (head == ctx->shadow_tail)
882                         break;
883
884                 i = min_t(int, i, nr - ret);
885                 i = min_t(int, i, AIO_EVENTS_PER_PAGE -
886                           ((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE));
887
888                 pos = head + AIO_EVENTS_OFFSET;
889                 page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE];
890                 pos %= AIO_EVENTS_PER_PAGE;
891
892                 ev = kmap(page);
893                 copy_ret = copy_to_user(event + ret, ev + pos, sizeof(*ev) * i);
894                 kunmap(page);
895
896                 if (unlikely(copy_ret)) {
897                         ret = -EFAULT;
898                         goto out;
899                 }
900
901                 ret += i;
902                 head += i;
903                 head %= ctx->nr;
904         }
905
906         ring = kmap_atomic(ctx->ring_pages[0]);
907         ring->head = head;
908         kunmap_atomic(ring);
909         flush_dcache_page(ctx->ring_pages[0]);
910
911         pr_debug("%d  h%u t%u\n", ret, head, ctx->shadow_tail);
912
913         put_reqs_available(ctx, ret);
914 out:
915         mutex_unlock(&ctx->ring_lock);
916
917         return ret;
918 }
919
920 static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
921                             struct io_event __user *event, long *i)
922 {
923         long ret = aio_read_events_ring(ctx, event + *i, nr - *i);
924
925         if (ret > 0)
926                 *i += ret;
927
928         if (unlikely(percpu_ref_dead(&ctx->users)))
929                 ret = -EINVAL;
930
931         if (!*i)
932                 *i = ret;
933
934         return ret < 0 || *i >= min_nr;
935 }
936
937 static long read_events(struct kioctx *ctx, long min_nr, long nr,
938                         struct io_event __user *event,
939                         struct timespec __user *timeout)
940 {
941         ktime_t until = { .tv64 = KTIME_MAX };
942         long ret = 0;
943
944         if (timeout) {
945                 struct timespec ts;
946
947                 if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
948                         return -EFAULT;
949
950                 until = timespec_to_ktime(ts);
951         }
952
953         wait_event_interruptible_hrtimeout(ctx->wait,
954                         aio_read_events(ctx, min_nr, nr, event, &ret), until);
955
956         if (!ret && signal_pending(current))
957                 ret = -EINTR;
958
959         return ret;
960 }
961
962 /* sys_io_setup:
963  *      Create an aio_context capable of receiving at least nr_events.
964  *      ctxp must not point to an aio_context that already exists, and
965  *      must be initialized to 0 prior to the call.  On successful
966  *      creation of the aio_context, *ctxp is filled in with the resulting 
967  *      handle.  May fail with -EINVAL if *ctxp is not initialized,
968  *      if the specified nr_events exceeds internal limits.  May fail 
969  *      with -EAGAIN if the specified nr_events exceeds the user's limit 
970  *      of available events.  May fail with -ENOMEM if insufficient kernel
971  *      resources are available.  May fail with -EFAULT if an invalid
972  *      pointer is passed for ctxp.  Will fail with -ENOSYS if not
973  *      implemented.
974  */
975 SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
976 {
977         struct kioctx *ioctx = NULL;
978         unsigned long ctx;
979         long ret;
980
981         ret = get_user(ctx, ctxp);
982         if (unlikely(ret))
983                 goto out;
984
985         ret = -EINVAL;
986         if (unlikely(ctx || nr_events == 0)) {
987                 pr_debug("EINVAL: io_setup: ctx %lu nr_events %u\n",
988                          ctx, nr_events);
989                 goto out;
990         }
991
992         ioctx = ioctx_alloc(nr_events);
993         ret = PTR_ERR(ioctx);
994         if (!IS_ERR(ioctx)) {
995                 ret = put_user(ioctx->user_id, ctxp);
996                 if (ret)
997                         kill_ioctx(ioctx);
998                 put_ioctx(ioctx);
999         }
1000
1001 out:
1002         return ret;
1003 }
1004
1005 /* sys_io_destroy:
1006  *      Destroy the aio_context specified.  May cancel any outstanding 
1007  *      AIOs and block on completion.  Will fail with -ENOSYS if not
1008  *      implemented.  May fail with -EINVAL if the context pointed to
1009  *      is invalid.
1010  */
1011 SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
1012 {
1013         struct kioctx *ioctx = lookup_ioctx(ctx);
1014         if (likely(NULL != ioctx)) {
1015                 kill_ioctx(ioctx);
1016                 put_ioctx(ioctx);
1017                 return 0;
1018         }
1019         pr_debug("EINVAL: io_destroy: invalid context id\n");
1020         return -EINVAL;
1021 }
1022
1023 static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
1024 {
1025         struct iovec *iov = &iocb->ki_iovec[iocb->ki_cur_seg];
1026
1027         BUG_ON(ret <= 0);
1028
1029         while (iocb->ki_cur_seg < iocb->ki_nr_segs && ret > 0) {
1030                 ssize_t this = min((ssize_t)iov->iov_len, ret);
1031                 iov->iov_base += this;
1032                 iov->iov_len -= this;
1033                 iocb->ki_left -= this;
1034                 ret -= this;
1035                 if (iov->iov_len == 0) {
1036                         iocb->ki_cur_seg++;
1037                         iov++;
1038                 }
1039         }
1040
1041         /* the caller should not have done more io than what fit in
1042          * the remaining iovecs */
1043         BUG_ON(ret > 0 && iocb->ki_left == 0);
1044 }
1045
1046 typedef ssize_t (aio_rw_op)(struct kiocb *, const struct iovec *,
1047                             unsigned long, loff_t);
1048
1049 static ssize_t aio_rw_vect_retry(struct kiocb *iocb, int rw, aio_rw_op *rw_op)
1050 {
1051         struct file *file = iocb->ki_filp;
1052         struct address_space *mapping = file->f_mapping;
1053         struct inode *inode = mapping->host;
1054         ssize_t ret = 0;
1055
1056         /* This matches the pread()/pwrite() logic */
1057         if (iocb->ki_pos < 0)
1058                 return -EINVAL;
1059
1060         do {
1061                 ret = rw_op(iocb, &iocb->ki_iovec[iocb->ki_cur_seg],
1062                             iocb->ki_nr_segs - iocb->ki_cur_seg,
1063                             iocb->ki_pos);
1064                 if (ret > 0)
1065                         aio_advance_iovec(iocb, ret);
1066
1067         /* retry all partial writes.  retry partial reads as long as its a
1068          * regular file. */
1069         } while (ret > 0 && iocb->ki_left > 0 &&
1070                  (rw == WRITE ||
1071                   (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))));
1072
1073         /* This means we must have transferred all that we could */
1074         /* No need to retry anymore */
1075         if ((ret == 0) || (iocb->ki_left == 0))
1076                 ret = iocb->ki_nbytes - iocb->ki_left;
1077
1078         /* If we managed to write some out we return that, rather than
1079          * the eventual error. */
1080         if (rw == WRITE
1081             && ret < 0 && ret != -EIOCBQUEUED
1082             && iocb->ki_nbytes - iocb->ki_left)
1083                 ret = iocb->ki_nbytes - iocb->ki_left;
1084
1085         return ret;
1086 }
1087
1088 static ssize_t aio_setup_vectored_rw(int rw, struct kiocb *kiocb, bool compat)
1089 {
1090         ssize_t ret;
1091
1092         kiocb->ki_nr_segs = kiocb->ki_nbytes;
1093
1094 #ifdef CONFIG_COMPAT
1095         if (compat)
1096                 ret = compat_rw_copy_check_uvector(rw,
1097                                 (struct compat_iovec __user *)kiocb->ki_buf,
1098                                 kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec,
1099                                 &kiocb->ki_iovec);
1100         else
1101 #endif
1102                 ret = rw_copy_check_uvector(rw,
1103                                 (struct iovec __user *)kiocb->ki_buf,
1104                                 kiocb->ki_nr_segs, 1, &kiocb->ki_inline_vec,
1105                                 &kiocb->ki_iovec);
1106         if (ret < 0)
1107                 return ret;
1108
1109         /* ki_nbytes now reflect bytes instead of segs */
1110         kiocb->ki_nbytes = ret;
1111         return 0;
1112 }
1113
1114 static ssize_t aio_setup_single_vector(int rw, struct kiocb *kiocb)
1115 {
1116         if (unlikely(!access_ok(!rw, kiocb->ki_buf, kiocb->ki_nbytes)))
1117                 return -EFAULT;
1118
1119         kiocb->ki_iovec = &kiocb->ki_inline_vec;
1120         kiocb->ki_iovec->iov_base = kiocb->ki_buf;
1121         kiocb->ki_iovec->iov_len = kiocb->ki_nbytes;
1122         kiocb->ki_nr_segs = 1;
1123         return 0;
1124 }
1125
1126 /*
1127  * aio_setup_iocb:
1128  *      Performs the initial checks and aio retry method
1129  *      setup for the kiocb at the time of io submission.
1130  */
1131 static ssize_t aio_run_iocb(struct kiocb *req, bool compat)
1132 {
1133         struct file *file = req->ki_filp;
1134         ssize_t ret;
1135         int rw;
1136         fmode_t mode;
1137         aio_rw_op *rw_op;
1138
1139         switch (req->ki_opcode) {
1140         case IOCB_CMD_PREAD:
1141         case IOCB_CMD_PREADV:
1142                 mode    = FMODE_READ;
1143                 rw      = READ;
1144                 rw_op   = file->f_op->aio_read;
1145                 goto rw_common;
1146
1147         case IOCB_CMD_PWRITE:
1148         case IOCB_CMD_PWRITEV:
1149                 mode    = FMODE_WRITE;
1150                 rw      = WRITE;
1151                 rw_op   = file->f_op->aio_write;
1152                 goto rw_common;
1153 rw_common:
1154                 if (unlikely(!(file->f_mode & mode)))
1155                         return -EBADF;
1156
1157                 if (!rw_op)
1158                         return -EINVAL;
1159
1160                 ret = (req->ki_opcode == IOCB_CMD_PREADV ||
1161                        req->ki_opcode == IOCB_CMD_PWRITEV)
1162                         ? aio_setup_vectored_rw(rw, req, compat)
1163                         : aio_setup_single_vector(rw, req);
1164                 if (ret)
1165                         return ret;
1166
1167                 ret = rw_verify_area(rw, file, &req->ki_pos, req->ki_nbytes);
1168                 if (ret < 0)
1169                         return ret;
1170
1171                 req->ki_nbytes = ret;
1172                 req->ki_left = ret;
1173
1174                 ret = aio_rw_vect_retry(req, rw, rw_op);
1175                 break;
1176
1177         case IOCB_CMD_FDSYNC:
1178                 if (!file->f_op->aio_fsync)
1179                         return -EINVAL;
1180
1181                 ret = file->f_op->aio_fsync(req, 1);
1182                 break;
1183
1184         case IOCB_CMD_FSYNC:
1185                 if (!file->f_op->aio_fsync)
1186                         return -EINVAL;
1187
1188                 ret = file->f_op->aio_fsync(req, 0);
1189                 break;
1190
1191         default:
1192                 pr_debug("EINVAL: no operation provided\n");
1193                 return -EINVAL;
1194         }
1195
1196         if (ret != -EIOCBQUEUED) {
1197                 /*
1198                  * There's no easy way to restart the syscall since other AIO's
1199                  * may be already running. Just fail this IO with EINTR.
1200                  */
1201                 if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
1202                              ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
1203                         ret = -EINTR;
1204                 aio_complete(req, ret, 0);
1205         }
1206
1207         return 0;
1208 }
1209
1210 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1211                          struct iocb *iocb, bool compat)
1212 {
1213         struct kiocb *req;
1214         ssize_t ret;
1215
1216         /* enforce forwards compatibility on users */
1217         if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
1218                 pr_debug("EINVAL: reserve field set\n");
1219                 return -EINVAL;
1220         }
1221
1222         /* prevent overflows */
1223         if (unlikely(
1224             (iocb->aio_buf != (unsigned long)iocb->aio_buf) ||
1225             (iocb->aio_nbytes != (size_t)iocb->aio_nbytes) ||
1226             ((ssize_t)iocb->aio_nbytes < 0)
1227            )) {
1228                 pr_debug("EINVAL: io_submit: overflow check\n");
1229                 return -EINVAL;
1230         }
1231
1232         req = aio_get_req(ctx);
1233         if (unlikely(!req))
1234                 return -EAGAIN;
1235
1236         req->ki_filp = fget(iocb->aio_fildes);
1237         if (unlikely(!req->ki_filp)) {
1238                 ret = -EBADF;
1239                 goto out_put_req;
1240         }
1241
1242         if (iocb->aio_flags & IOCB_FLAG_RESFD) {
1243                 /*
1244                  * If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
1245                  * instance of the file* now. The file descriptor must be
1246                  * an eventfd() fd, and will be signaled for each completed
1247                  * event using the eventfd_signal() function.
1248                  */
1249                 req->ki_eventfd = eventfd_ctx_fdget((int) iocb->aio_resfd);
1250                 if (IS_ERR(req->ki_eventfd)) {
1251                         ret = PTR_ERR(req->ki_eventfd);
1252                         req->ki_eventfd = NULL;
1253                         goto out_put_req;
1254                 }
1255         }
1256
1257         ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
1258         if (unlikely(ret)) {
1259                 pr_debug("EFAULT: aio_key\n");
1260                 goto out_put_req;
1261         }
1262
1263         req->ki_obj.user = user_iocb;
1264         req->ki_user_data = iocb->aio_data;
1265         req->ki_pos = iocb->aio_offset;
1266
1267         req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf;
1268         req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
1269         req->ki_opcode = iocb->aio_lio_opcode;
1270
1271         ret = aio_run_iocb(req, compat);
1272         if (ret)
1273                 goto out_put_req;
1274
1275         aio_put_req(req);       /* drop extra ref to req */
1276         return 0;
1277 out_put_req:
1278         put_reqs_available(ctx, 1);
1279         aio_put_req(req);       /* drop extra ref to req */
1280         aio_put_req(req);       /* drop i/o ref to req */
1281         return ret;
1282 }
1283
1284 long do_io_submit(aio_context_t ctx_id, long nr,
1285                   struct iocb __user *__user *iocbpp, bool compat)
1286 {
1287         struct kioctx *ctx;
1288         long ret = 0;
1289         int i = 0;
1290         struct blk_plug plug;
1291
1292         if (unlikely(nr < 0))
1293                 return -EINVAL;
1294
1295         if (unlikely(nr > LONG_MAX/sizeof(*iocbpp)))
1296                 nr = LONG_MAX/sizeof(*iocbpp);
1297
1298         if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp)))))
1299                 return -EFAULT;
1300
1301         ctx = lookup_ioctx(ctx_id);
1302         if (unlikely(!ctx)) {
1303                 pr_debug("EINVAL: invalid context id\n");
1304                 return -EINVAL;
1305         }
1306
1307         blk_start_plug(&plug);
1308
1309         /*
1310          * AKPM: should this return a partial result if some of the IOs were
1311          * successfully submitted?
1312          */
1313         for (i=0; i<nr; i++) {
1314                 struct iocb __user *user_iocb;
1315                 struct iocb tmp;
1316
1317                 if (unlikely(__get_user(user_iocb, iocbpp + i))) {
1318                         ret = -EFAULT;
1319                         break;
1320                 }
1321
1322                 if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) {
1323                         ret = -EFAULT;
1324                         break;
1325                 }
1326
1327                 ret = io_submit_one(ctx, user_iocb, &tmp, compat);
1328                 if (ret)
1329                         break;
1330         }
1331         blk_finish_plug(&plug);
1332
1333         put_ioctx(ctx);
1334         return i ? i : ret;
1335 }
1336
1337 /* sys_io_submit:
1338  *      Queue the nr iocbs pointed to by iocbpp for processing.  Returns
1339  *      the number of iocbs queued.  May return -EINVAL if the aio_context
1340  *      specified by ctx_id is invalid, if nr is < 0, if the iocb at
1341  *      *iocbpp[0] is not properly initialized, if the operation specified
1342  *      is invalid for the file descriptor in the iocb.  May fail with
1343  *      -EFAULT if any of the data structures point to invalid data.  May
1344  *      fail with -EBADF if the file descriptor specified in the first
1345  *      iocb is invalid.  May fail with -EAGAIN if insufficient resources
1346  *      are available to queue any iocbs.  Will return 0 if nr is 0.  Will
1347  *      fail with -ENOSYS if not implemented.
1348  */
1349 SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
1350                 struct iocb __user * __user *, iocbpp)
1351 {
1352         return do_io_submit(ctx_id, nr, iocbpp, 0);
1353 }
1354
1355 /* lookup_kiocb
1356  *      Finds a given iocb for cancellation.
1357  */
1358 static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
1359                                   u32 key)
1360 {
1361         struct list_head *pos;
1362
1363         assert_spin_locked(&ctx->ctx_lock);
1364
1365         if (key != KIOCB_KEY)
1366                 return NULL;
1367
1368         /* TODO: use a hash or array, this sucks. */
1369         list_for_each(pos, &ctx->active_reqs) {
1370                 struct kiocb *kiocb = list_kiocb(pos);
1371                 if (kiocb->ki_obj.user == iocb)
1372                         return kiocb;
1373         }
1374         return NULL;
1375 }
1376
1377 /* sys_io_cancel:
1378  *      Attempts to cancel an iocb previously passed to io_submit.  If
1379  *      the operation is successfully cancelled, the resulting event is
1380  *      copied into the memory pointed to by result without being placed
1381  *      into the completion queue and 0 is returned.  May fail with
1382  *      -EFAULT if any of the data structures pointed to are invalid.
1383  *      May fail with -EINVAL if aio_context specified by ctx_id is
1384  *      invalid.  May fail with -EAGAIN if the iocb specified was not
1385  *      cancelled.  Will fail with -ENOSYS if not implemented.
1386  */
1387 SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
1388                 struct io_event __user *, result)
1389 {
1390         struct io_event res;
1391         struct kioctx *ctx;
1392         struct kiocb *kiocb;
1393         u32 key;
1394         int ret;
1395
1396         ret = get_user(key, &iocb->aio_key);
1397         if (unlikely(ret))
1398                 return -EFAULT;
1399
1400         ctx = lookup_ioctx(ctx_id);
1401         if (unlikely(!ctx))
1402                 return -EINVAL;
1403
1404         spin_lock_irq(&ctx->ctx_lock);
1405
1406         kiocb = lookup_kiocb(ctx, iocb, key);
1407         if (kiocb)
1408                 ret = kiocb_cancel(ctx, kiocb, &res);
1409         else
1410                 ret = -EINVAL;
1411
1412         spin_unlock_irq(&ctx->ctx_lock);
1413
1414         if (!ret) {
1415                 /* Cancellation succeeded -- copy the result
1416                  * into the user's buffer.
1417                  */
1418                 if (copy_to_user(result, &res, sizeof(res)))
1419                         ret = -EFAULT;
1420         }
1421
1422         put_ioctx(ctx);
1423
1424         return ret;
1425 }
1426
1427 /* io_getevents:
1428  *      Attempts to read at least min_nr events and up to nr events from
1429  *      the completion queue for the aio_context specified by ctx_id. If
1430  *      it succeeds, the number of read events is returned. May fail with
1431  *      -EINVAL if ctx_id is invalid, if min_nr is out of range, if nr is
1432  *      out of range, if timeout is out of range.  May fail with -EFAULT
1433  *      if any of the memory specified is invalid.  May return 0 or
1434  *      < min_nr if the timeout specified by timeout has elapsed
1435  *      before sufficient events are available, where timeout == NULL
1436  *      specifies an infinite timeout. Note that the timeout pointed to by
1437  *      timeout is relative and will be updated if not NULL and the
1438  *      operation blocks. Will fail with -ENOSYS if not implemented.
1439  */
1440 SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
1441                 long, min_nr,
1442                 long, nr,
1443                 struct io_event __user *, events,
1444                 struct timespec __user *, timeout)
1445 {
1446         struct kioctx *ioctx = lookup_ioctx(ctx_id);
1447         long ret = -EINVAL;
1448
1449         if (likely(ioctx)) {
1450                 if (likely(min_nr <= nr && min_nr >= 0))
1451                         ret = read_events(ioctx, min_nr, nr, events, timeout);
1452                 put_ioctx(ioctx);
1453         }
1454         return ret;
1455 }