#define AIO_RING_MAGIC 0xa10a10a1
#define AIO_RING_COMPAT_FEATURES 1
+#define AIO_RING_COMPAT_THREADED 2
#define AIO_RING_INCOMPAT_FEATURES 0
struct aio_ring {
unsigned id; /* kernel internal index number */
struct file *aio_ring_file;
unsigned id;
+ struct mm_struct *mm;
};
+struct aio_kiocb;
+typedef long (*aio_thread_work_fn_t)(struct aio_kiocb *iocb);
+
/*
* We use ki_cancel == KIOCB_CANCELLED to indicate that a kiocb has been either
* cancelled or completed (this makes a certain amount of sense because
* this is the underlying eventfd context to deliver events to.
*/
struct eventfd_ctx *ki_eventfd;
+
+ struct iov_iter ki_iter;
+ struct iovec *ki_iovec;
+ struct iovec ki_inline_vecs[UIO_FASTIOV];
+
+ // Fields used for threaded aio helper.
+ struct task_struct *ki_submit_task;
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+ struct task_struct *ki_cancel_task;
+ unsigned long ki_data;
+ unsigned long ki_rlimit_fsize;
+ aio_thread_work_fn_t ki_work_fn;
+ struct work_struct ki_work;
+#endif
};
/*------ sysctl variables----*/
static DEFINE_SPINLOCK(aio_nr_lock);
unsigned long aio_nr; /* current system wide number of aio requests */
unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+unsigned long aio_auto_threads = 0; /* Currently disabled by default */
+#endif
/*----end sysctl variables---*/
static struct kmem_cache *kiocb_cachep;
static const struct file_operations aio_ring_fops;
static const struct address_space_operations aio_ctx_aops;
+static void aio_complete(struct kiocb *kiocb, long res, long res2);
+
static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
{
struct qstr this = QSTR_INIT("[aio]", 5);
ring->head = ring->tail = 0;
ring->magic = AIO_RING_MAGIC;
ring->compat_features = AIO_RING_COMPAT_FEATURES;
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+ if (aio_auto_threads & 1)
+ ring->compat_features |= AIO_RING_COMPAT_THREADED;
+#endif
ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
ring->header_length = sizeof(struct aio_ring);
kunmap_atomic(ring);
return cancel(&kiocb->common);
}
+struct mm_struct *aio_get_mm(struct kiocb *req)
+{
+ if (req->ki_complete == aio_complete) {
+ struct aio_kiocb *iocb;
+ iocb = container_of(req, struct aio_kiocb, common);
+ return iocb->ki_ctx->mm;
+ }
+ return NULL;
+}
+
+struct task_struct *aio_get_task(struct kiocb *req)
+{
+ if (req->ki_complete == aio_complete) {
+ struct aio_kiocb *iocb;
+ iocb = container_of(req, struct aio_kiocb, common);
+ return iocb->ki_submit_task;
+ }
+ return current;
+}
+
static void free_ioctx(struct work_struct *work)
{
struct kioctx *ctx = container_of(work, struct kioctx, free_work);
return ERR_PTR(-ENOMEM);
ctx->max_reqs = nr_events;
+ ctx->mm = mm;
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
percpu_ref_get(&ctx->reqs);
req->ki_ctx = ctx;
+ req->ki_iovec = req->ki_inline_vecs;
return req;
out_put:
put_reqs_available(ctx, 1);
fput(req->common.ki_filp);
if (req->ki_eventfd != NULL)
eventfd_ctx_put(req->ki_eventfd);
+ if (req->ki_iovec != req->ki_inline_vecs)
+ kfree(req->ki_iovec);
+ if (req->ki_submit_task)
+ put_task_struct(req->ki_submit_task);
kmem_cache_free(kiocb_cachep, req);
}
if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
return -EFAULT;
+ if (!timespec_valid(&ts))
+ return -EINVAL;
until = timespec_to_ktime(ts);
}
len, UIO_FASTIOV, iovec, iter);
}
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+/* aio_thread_queue_iocb_cancel_early:
+ * Early stage cancellation helper function for threaded aios. This
+ * is used prior to the iocb being assigned to a worker thread.
+ */
+static int aio_thread_queue_iocb_cancel_early(struct kiocb *iocb)
+{
+ return 0;
+}
+
+/* aio_thread_queue_iocb_cancel:
+ * Late stage cancellation method for threaded aios. Once an iocb is
+ * assigned to a worker thread, we use a fatal signal to interrupt an
+ * in-progress operation.
+ */
+static int aio_thread_queue_iocb_cancel(struct kiocb *kiocb)
+{
+ struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, common);
+ if (iocb->ki_cancel_task) {
+ force_sig(SIGKILL, iocb->ki_cancel_task);
+ return 0;
+ }
+ return -EAGAIN;
+}
+
+/* aio_thread_fn:
+ * Entry point for worker to perform threaded aio. Handles issues
+ * arising due to cancellation using signals.
+ */
+static void aio_thread_fn(struct work_struct *work)
+{
+ struct aio_kiocb *iocb = container_of(work, struct aio_kiocb, ki_work);
+ kiocb_cancel_fn *old_cancel;
+ long ret;
+
+ iocb->ki_cancel_task = current;
+ current->kiocb = &iocb->common; /* For io_send_sig(). */
+ BUG_ON(atomic_read(¤t->signal->sigcnt) != 1);
+
+ /* Check for early stage cancellation and switch to late stage
+ * cancellation if it has not already occurred.
+ */
+ old_cancel = cmpxchg(&iocb->ki_cancel,
+ aio_thread_queue_iocb_cancel_early,
+ aio_thread_queue_iocb_cancel);
+ if (old_cancel != KIOCB_CANCELLED)
+ ret = iocb->ki_work_fn(iocb);
+ else
+ ret = -EINTR;
+
+ current->kiocb = NULL;
+ if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
+ ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
+ ret = -EINTR;
+
+ /* Completion serializes cancellation by taking ctx_lock, so
+ * aio_complete() will not return until after force_sig() in
+ * aio_thread_queue_iocb_cancel(). This should ensure that
+ * the signal is pending before being flushed in this thread.
+ */
+ aio_complete(&iocb->common, ret, 0);
+ if (fatal_signal_pending(current))
+ flush_signals(current);
+}
+
+#define AIO_THREAD_NEED_TASK 0x0001 /* Need aio_kiocb->ki_submit_task */
+
+/* aio_thread_queue_iocb
+ * Queues an aio_kiocb for dispatch to a worker thread. Prepares the
+ * aio_kiocb for cancellation. The caller must provide a function to
+ * execute the operation in work_fn. The flags may be provided as an
+ * ored set AIO_THREAD_xxx.
+ */
+static ssize_t aio_thread_queue_iocb(struct aio_kiocb *iocb,
+ aio_thread_work_fn_t work_fn,
+ unsigned flags)
+{
+ INIT_WORK(&iocb->ki_work, aio_thread_fn);
+ iocb->ki_work_fn = work_fn;
+ if (flags & AIO_THREAD_NEED_TASK) {
+ iocb->ki_submit_task = current;
+ get_task_struct(iocb->ki_submit_task);
+ }
+
+ /* Cancellation needs to be always available for operations performed
+ * using helper threads. Prior to the iocb being assigned to a worker
+ * thread, we need to record that a cancellation has occurred. We
+ * can do this by having a minimal helper function that is recorded in
+ * ki_cancel.
+ */
+ kiocb_set_cancel_fn(&iocb->common, aio_thread_queue_iocb_cancel_early);
+ queue_work(system_long_wq, &iocb->ki_work);
+ return -EIOCBQUEUED;
+}
+
+static long aio_thread_op_read_iter(struct aio_kiocb *iocb)
+{
+ struct file *filp;
+ long ret;
+
+ use_mm(iocb->ki_ctx->mm);
+ filp = iocb->common.ki_filp;
+
+ if (filp->f_op->read_iter) {
+ struct kiocb sync_kiocb;
+ init_sync_kiocb(&sync_kiocb, filp);
+ sync_kiocb.ki_pos = iocb->common.ki_pos;
+ ret = filp->f_op->read_iter(&sync_kiocb, &iocb->ki_iter);
+ } else if (filp->f_op->read)
+ ret = do_loop_readv_writev(filp, &iocb->ki_iter,
+ &iocb->common.ki_pos,
+ filp->f_op->read);
+ else
+ ret = -EINVAL;
+ unuse_mm(iocb->ki_ctx->mm);
+ return ret;
+}
+
+ssize_t generic_async_read_iter_non_direct(struct kiocb *iocb,
+ struct iov_iter *iter)
+{
+ if ((iocb->ki_flags & IOCB_DIRECT) ||
+ (iocb->ki_complete != aio_complete))
+ return iocb->ki_filp->f_op->read_iter(iocb, iter);
+ return generic_async_read_iter(iocb, iter);
+}
+EXPORT_SYMBOL(generic_async_read_iter_non_direct);
+
+ssize_t generic_async_read_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+ struct aio_kiocb *req;
+
+ req = container_of(iocb, struct aio_kiocb, common);
+ BUG_ON(iter != &req->ki_iter);
+
+ return aio_thread_queue_iocb(req, aio_thread_op_read_iter,
+ AIO_THREAD_NEED_TASK);
+}
+EXPORT_SYMBOL(generic_async_read_iter);
+
+static long aio_thread_op_write_iter(struct aio_kiocb *iocb)
+{
+ u64 saved_rlim_fsize;
+ struct file *filp;
+ long ret;
+
+ use_mm(iocb->ki_ctx->mm);
+ filp = iocb->common.ki_filp;
+ saved_rlim_fsize = rlimit(RLIMIT_FSIZE);
+ current->signal->rlim[RLIMIT_FSIZE].rlim_cur = iocb->ki_rlimit_fsize;
+
+ if (filp->f_op->write_iter) {
+ struct kiocb sync_kiocb;
+ init_sync_kiocb(&sync_kiocb, filp);
+ sync_kiocb.ki_pos = iocb->common.ki_pos;
+ ret = filp->f_op->write_iter(&sync_kiocb, &iocb->ki_iter);
+ } else if (filp->f_op->write)
+ ret = do_loop_readv_writev(filp, &iocb->ki_iter,
+ &iocb->common.ki_pos,
+ (io_fn_t)filp->f_op->write);
+ else
+ ret = -EINVAL;
+ current->signal->rlim[RLIMIT_FSIZE].rlim_cur = saved_rlim_fsize;
+ unuse_mm(iocb->ki_ctx->mm);
+ return ret;
+}
+
+ssize_t generic_async_write_iter_non_direct(struct kiocb *iocb,
+ struct iov_iter *iter)
+{
+ if ((iocb->ki_flags & IOCB_DIRECT) ||
+ (iocb->ki_complete != aio_complete))
+ return iocb->ki_filp->f_op->write_iter(iocb, iter);
+ return generic_async_write_iter(iocb, iter);
+}
+EXPORT_SYMBOL(generic_async_write_iter_non_direct);
+
+ssize_t generic_async_write_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+ struct aio_kiocb *req;
+
+ req = container_of(iocb, struct aio_kiocb, common);
+ BUG_ON(iter != &req->ki_iter);
+ req->ki_rlimit_fsize = rlimit(RLIMIT_FSIZE);
+
+ return aio_thread_queue_iocb(req, aio_thread_op_write_iter,
+ AIO_THREAD_NEED_TASK);
+}
+EXPORT_SYMBOL(generic_async_write_iter);
+
+static long aio_thread_op_fsync(struct aio_kiocb *iocb)
+{
+ return vfs_fsync(iocb->common.ki_filp, 0);
+}
+
+static long aio_thread_op_fdatasync(struct aio_kiocb *iocb)
+{
+ return vfs_fsync(iocb->common.ki_filp, 1);
+}
+
+ssize_t generic_async_fsync(struct kiocb *iocb, int datasync)
+{
+ struct aio_kiocb *req;
+
+ BUG_ON(iocb->ki_complete != aio_complete);
+ req = container_of(iocb, struct aio_kiocb, common);
+
+ return aio_thread_queue_iocb(req, datasync ? aio_thread_op_fdatasync
+ : aio_thread_op_fsync, 0);
+}
+EXPORT_SYMBOL(generic_async_fsync);
+
+static long aio_thread_op_poll(struct aio_kiocb *iocb)
+{
+ struct file *file = iocb->common.ki_filp;
+ short events = iocb->ki_data;
+ struct poll_wqueues table;
+ unsigned int mask;
+ ssize_t ret = 0;
+
+ poll_initwait(&table);
+ events |= POLLERR | POLLHUP;
+
+ for (;;) {
+ mask = DEFAULT_POLLMASK;
+ if (file->f_op && file->f_op->poll) {
+ table.pt._key = events;
+ mask = file->f_op->poll(file, &table.pt);
+ }
+ /* Mask out unneeded events. */
+ mask &= events;
+ ret = mask;
+ if (mask)
+ break;
+
+ ret = -EINTR;
+ if (signal_pending(current))
+ break;
+
+ poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, NULL, 0);
+ }
+
+ poll_freewait(&table);
+ return ret;
+}
+#endif /* IS_ENABLED(CONFIG_AIO_THREAD) */
+
/*
* aio_run_iocb:
* Performs the initial checks and io submission.
*/
-static ssize_t aio_run_iocb(struct kiocb *req, unsigned opcode,
+static ssize_t aio_run_iocb(struct aio_kiocb *req, unsigned opcode,
char __user *buf, size_t len, bool compat)
{
- struct file *file = req->ki_filp;
- ssize_t ret;
+ struct file *file = req->common.ki_filp;
+ ssize_t ret = -EINVAL;
int rw;
fmode_t mode;
rw_iter_op *iter_op;
- struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
- struct iov_iter iter;
switch (opcode) {
case IOCB_CMD_PREAD:
case IOCB_CMD_PREADV:
mode = FMODE_READ;
rw = READ;
+ iter_op = file->f_op->async_read_iter;
+ if (iter_op)
+ goto rw_common;
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+ if ((aio_auto_threads & 1) &&
+ (file->f_op->read_iter || file->f_op->read)) {
+ iter_op = generic_async_read_iter;
+ goto rw_common;
+ }
+#endif
iter_op = file->f_op->read_iter;
goto rw_common;
case IOCB_CMD_PWRITEV:
mode = FMODE_WRITE;
rw = WRITE;
+ iter_op = file->f_op->async_write_iter;
+ if (iter_op)
+ goto rw_common;
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+ if ((aio_auto_threads & 1) &&
+ (file->f_op->write_iter || file->f_op->write)) {
+ iter_op = generic_async_write_iter;
+ goto rw_common;
+ }
+#endif
iter_op = file->f_op->write_iter;
goto rw_common;
rw_common:
if (opcode == IOCB_CMD_PREADV || opcode == IOCB_CMD_PWRITEV)
ret = aio_setup_vectored_rw(rw, buf, len,
- &iovec, compat, &iter);
+ &req->ki_iovec, compat,
+ &req->ki_iter);
else {
- ret = import_single_range(rw, buf, len, iovec, &iter);
- iovec = NULL;
+ ret = import_single_range(rw, buf, len, req->ki_iovec,
+ &req->ki_iter);
}
if (!ret)
- ret = rw_verify_area(rw, file, &req->ki_pos,
- iov_iter_count(&iter));
- if (ret < 0) {
- kfree(iovec);
+ ret = rw_verify_area(rw, file, &req->common.ki_pos,
+ iov_iter_count(&req->ki_iter));
+ if (ret < 0)
return ret;
- }
-
- len = ret;
if (rw == WRITE)
file_start_write(file);
- ret = iter_op(req, &iter);
+ ret = iter_op(&req->common, &req->ki_iter);
if (rw == WRITE)
file_end_write(file);
- kfree(iovec);
break;
case IOCB_CMD_FDSYNC:
- if (!file->f_op->aio_fsync)
- return -EINVAL;
-
- ret = file->f_op->aio_fsync(req, 1);
+ if (file->f_op->aio_fsync)
+ ret = file->f_op->aio_fsync(&req->common, 1);
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+ else if (file->f_op->fsync && (aio_auto_threads & 1))
+ ret = generic_async_fsync(&req->common, 1);
+#endif
break;
case IOCB_CMD_FSYNC:
- if (!file->f_op->aio_fsync)
- return -EINVAL;
+ if (file->f_op->aio_fsync)
+ ret = file->f_op->aio_fsync(&req->common, 0);
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+ else if (file->f_op->fsync && (aio_auto_threads & 1))
+ ret = generic_async_fsync(&req->common, 0);
+#endif
+ break;
- ret = file->f_op->aio_fsync(req, 0);
+ case IOCB_CMD_POLL:
+#if IS_ENABLED(CONFIG_AIO_THREAD)
+ if (aio_auto_threads & 1)
+ ret = aio_thread_queue_iocb(req, aio_thread_op_poll, 0);
+#endif
break;
default:
pr_debug("EINVAL: no operation provided\n");
- return -EINVAL;
+ break;
}
if (ret != -EIOCBQUEUED) {
ret == -ERESTARTNOHAND ||
ret == -ERESTART_RESTARTBLOCK))
ret = -EINTR;
- aio_complete(req, ret, 0);
+ aio_complete(&req->common, ret, 0);
}
return 0;
req->ki_user_iocb = user_iocb;
req->ki_user_data = iocb->aio_data;
- ret = aio_run_iocb(&req->common, iocb->aio_lio_opcode,
+ ret = aio_run_iocb(req, iocb->aio_lio_opcode,
(char __user *)(unsigned long)iocb->aio_buf,
iocb->aio_nbytes,
compat);