struct {
struct mutex ring_lock;
wait_queue_head_t wait;
+
+ /*
+ * Copy of the real tail - to reduce cacheline bouncing. Updated
+ * by aio_complete() whenever it updates the real tail.
+ */
+ unsigned shadow_tail;
} ____cacheline_aligned_in_smp;
struct {
+ /*
+ * This is the canonical copy of the tail pointer, updated by
+ * aio_complete(). But aio_complete() also uses it as a lock, so
+ * other code can't use it; aio_complete() keeps shadow_tail in
+ * sync with the real value of the tail pointer for other code
+ * to use.
+ */
unsigned tail;
- spinlock_t completion_lock;
} ____cacheline_aligned_in_smp;
struct page *internal_pages[AIO_RING_PAGES];
kunmap_atomic(ring);
while (atomic_read(&ctx->reqs_available) < ctx->nr_events - 1) {
- wait_event(ctx->wait, head != ctx->tail);
+ wait_event(ctx->wait, head != ctx->shadow_tail);
- avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head;
+ avail = (head <= ctx->shadow_tail
+ ? ctx->shadow_tail : ctx->nr_events) - head;
atomic_add(avail, &ctx->reqs_available);
head += avail;
rcu_read_unlock();
spin_lock_init(&ctx->ctx_lock);
- spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
* free_ioctx()
*/
atomic_inc(&ctx->reqs_available);
+ smp_mb__after_atomic_inc();
/* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
}
/*
- * Add a completion event to the ring buffer. Must be done holding
- * ctx->ctx_lock to prevent other code from messing with the tail
- * pointer since we might be called from irq context.
+ * Add a completion event to the ring buffer; ctx->tail is both our lock
+ * and the canonical version of the tail pointer.
*/
- spin_lock_irqsave(&ctx->completion_lock, flags);
+ local_irq_save(flags);
+ while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
+ cpu_relax();
- tail = ctx->tail;
pos = tail + AIO_EVENTS_OFFSET;
if (++tail >= ctx->nr_events)
*/
smp_wmb(); /* make event visible before updating tail */
- ctx->tail = tail;
+ ctx->shadow_tail = tail;
ring = kmap_atomic(ctx->ring_pages[0]);
ring->tail = tail;
kunmap_atomic(ring);
flush_dcache_page(ctx->ring_pages[0]);
- spin_unlock_irqrestore(&ctx->completion_lock, flags);
+ /* unlock, make new tail visible before checking waitlist */
+ smp_mb();
+
+ ctx->tail = tail;
+ local_irq_restore(flags);
pr_debug("added to ring %p at [%u]\n", iocb, tail);
/* everything turned out well, dispose of the aiocb. */
aio_put_req(iocb);
- /*
- * We have to order our ring_info tail store above and test
- * of the wait list below outside the wait lock. This is
- * like in wake_up_bit() where clearing a bit has to be
- * ordered with the unlocked test.
- */
- smp_mb();
-
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
head = ring->head;
kunmap_atomic(ring);
- pr_debug("h%u t%u m%u\n", head, ctx->tail, ctx->nr_events);
+ pr_debug("h%u t%u m%u\n", head, ctx->shadow_tail, ctx->nr_events);
- if (head == ctx->tail)
+ if (head == ctx->shadow_tail)
goto out;
while (ret < nr) {
struct io_event *ev;
struct page *page;
- avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head;
- if (head == ctx->tail)
+ avail = (head <= ctx->shadow_tail ?
+ ctx->shadow_tail : ctx->nr_events) - head;
+ if (head == ctx->shadow_tail)
break;
avail = min(avail, nr - ret);
kunmap_atomic(ring);
flush_dcache_page(ctx->ring_pages[0]);
- pr_debug("%li h%u t%u\n", ret, head, ctx->tail);
+ pr_debug("%li h%u t%u\n", ret, head, ctx->shadow_tail);
put_reqs_available(ctx, ret);
out: