struct __percpu kioctx_cpu *cpu;
- unsigned req_batch;
-
- unsigned nr;
+ /* Size of ringbuffer, in units of struct io_event */
+ unsigned nr_events;
- /* sys_io_setup currently limits this to an unsigned int */
+ /*
+ * Maximum number of outstanding requests:
+ * sys_io_setup currently limits this to an unsigned int
+ */
unsigned max_reqs;
+ /*
+ * For percpu reqs_available, number of slots we move to/from global
+ * counter at a time:
+ */
+ unsigned req_batch;
+
unsigned long mmap_base;
unsigned long mmap_size;
struct work_struct rcu_work;
struct {
+ /*
+ * This counts the number of available slots in the ringbuffer,
+ * so we avoid overflowing it: it's decremented (if positive)
+ * when allocating a kiocb and incremented when the resulting
+ * io_event is pulled off the ringbuffer.
+ *
+ * We batch accesses to it with a percpu version.
+ */
atomic_t reqs_available;
} ____cacheline_aligned_in_smp;
unsigned long size, populate;
int nr_pages;
- nr_events = max(nr_events, num_possible_cpus() * 4);
- nr_events *= 2;
-
/* Compensate for the ring buffer's head/tail overlap entry */
nr_events += 2; /* 1 is required, 2 for good luck */
nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
- ctx->nr = 0;
+ ctx->nr_events = 0;
ctx->ring_pages = ctx->internal_pages;
if (nr_pages > AIO_RING_PAGES) {
ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *),
mm_populate(ctx->mmap_base, populate);
ctx->user_id = ctx->mmap_base;
- ctx->nr = nr_events; /* trusted copy */
+ ctx->nr_events = nr_events; /* trusted copy */
ring = kmap_atomic(ctx->ring_pages[0]);
ring->nr = nr_events; /* user copy */
head = ring->head;
kunmap_atomic(ring);
- while (atomic_read(&ctx->reqs_available) < ctx->nr) {
+ while (atomic_read(&ctx->reqs_available) < ctx->max_reqs) {
wait_event(ctx->wait,
(head != ctx->shadow_tail) ||
- (atomic_read(&ctx->reqs_available) >= ctx->nr));
+ (atomic_read(&ctx->reqs_available) >= ctx->max_reqs));
avail = (head <= ctx->shadow_tail ?
- ctx->shadow_tail : ctx->nr) - head;
+ ctx->shadow_tail : ctx->nr_events) - head;
atomic_add(avail, &ctx->reqs_available);
head += avail;
- head %= ctx->nr;
+ head %= ctx->nr_events;
}
- WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr);
+ WARN_ON(atomic_read(&ctx->reqs_available) > ctx->max_reqs);
aio_free_ring(ctx);
struct kioctx *ctx;
int err = -ENOMEM;
+ /*
+ * We keep track of the number of available ringbuffer slots, to prevent
+ * overflow (reqs_available), and we also use percpu counters for this.
+ *
+ * So since up to half the slots might be on other cpu's percpu counters
+ * and unavailable, double nr_events so userspace sees what they
+ * expected: additionally, we move req_batch slots to/from percpu
+ * counters at a time, so make sure that isn't 0:
+ */
+ nr_events = max(nr_events, num_possible_cpus() * 4);
+ nr_events *= 2;
+
/* Prevent overflows */
if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
(nr_events > (0x10000000U / sizeof(struct kiocb)))) {
return ERR_PTR(-ENOMEM);
ctx->max_reqs = nr_events;
+ atomic_set(&ctx->reqs_available, nr_events);
+ ctx->req_batch = nr_events / (num_possible_cpus() * 4);
percpu_ref_init(&ctx->users);
rcu_read_lock();
if (aio_setup_ring(ctx) < 0)
goto out_freepcpu;
- atomic_set(&ctx->reqs_available, ctx->nr);
- ctx->req_batch = ctx->nr / (num_possible_cpus() * 4);
- BUG_ON(!ctx->req_batch);
-
/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
if (aio_nr + nr_events > aio_max_nr ||
spin_unlock(&mm->ioctx_lock);
pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
- ctx, ctx->user_id, mm, ctx->nr);
+ ctx, ctx->user_id, mm, ctx->nr_events);
return ctx;
out_cleanup:
struct io_event *ev_page, *event;
unsigned pos = tail + AIO_EVENTS_OFFSET;
- if (++tail >= ctx->nr)
+ if (++tail >= ctx->nr_events)
tail = 0;
ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
head = ring->head;
kunmap_atomic(ring);
- pr_debug("h%u t%u m%u\n", head, ctx->shadow_tail, ctx->nr);
+ pr_debug("h%u t%u m%u\n", head, ctx->shadow_tail, ctx->nr_events);
if (head == ctx->shadow_tail)
goto out;
while (ret < nr) {
long avail = (head <= ctx->shadow_tail
- ? ctx->shadow_tail : ctx->nr) - head;
+ ? ctx->shadow_tail : ctx->nr_events) - head;
struct io_event *ev;
struct page *page;
ret += avail;
head += avail;
- head %= ctx->nr;
+ head %= ctx->nr_events;
}
ring = kmap_atomic(ctx->ring_pages[0]);