]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - fs/direct-io.c
Merge tag 'gfs2-merge-window' of git://git.kernel.org/pub/scm/linux/kernel/git/steve...
[karo-tx-linux.git] / fs / direct-io.c
index 7ab90f5081eebc4ab8b0de88bef8d0b6310ed113..1782023bd68a6655d6def2258263d22080c06c33 100644 (file)
@@ -127,6 +127,7 @@ struct dio {
        spinlock_t bio_lock;            /* protects BIO fields below */
        int page_errors;                /* errno from get_user_pages() */
        int is_async;                   /* is IO async ? */
+       bool defer_completion;          /* defer AIO completion to workqueue? */
        int io_error;                   /* IO error in completion path */
        unsigned long refcount;         /* direct_io_worker() and bios */
        struct bio *bio_list;           /* singly linked via bi_private */
@@ -141,7 +142,10 @@ struct dio {
         * allocation time.  Don't add new fields after pages[] unless you
         * wish that they not be zeroed.
         */
-       struct page *pages[DIO_PAGES];  /* page buffer */
+       union {
+               struct page *pages[DIO_PAGES];  /* page buffer */
+               struct work_struct complete_work;/* deferred AIO completion */
+       };
 } ____cacheline_aligned_in_smp;
 
 static struct kmem_cache *dio_cache __read_mostly;
@@ -221,16 +225,16 @@ static inline struct page *dio_get_page(struct dio *dio,
  * dio_complete() - called when all DIO BIO I/O has been completed
  * @offset: the byte offset in the file of the completed operation
  *
- * This releases locks as dictated by the locking type, lets interested parties
- * know that a DIO operation has completed, and calculates the resulting return
- * code for the operation.
+ * This drops i_dio_count, lets interested parties know that a DIO operation
+ * has completed, and calculates the resulting return code for the operation.
  *
  * It lets the filesystem know if it registered an interest earlier via
  * get_block.  Pass the private field of the map buffer_head so that
  * filesystems can use it to hold additional state between get_block calls and
  * dio_complete.
  */
-static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async)
+static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret,
+               bool is_async)
 {
        ssize_t transferred = 0;
 
@@ -258,19 +262,36 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is
        if (ret == 0)
                ret = transferred;
 
-       if (dio->end_io && dio->result) {
-               dio->end_io(dio->iocb, offset, transferred,
-                           dio->private, ret, is_async);
-       } else {
-               inode_dio_done(dio->inode);
-               if (is_async)
-                       aio_complete(dio->iocb, ret, 0);
+       if (dio->end_io && dio->result)
+               dio->end_io(dio->iocb, offset, transferred, dio->private);
+
+       inode_dio_done(dio->inode);
+       if (is_async) {
+               if (dio->rw & WRITE) {
+                       int err;
+
+                       err = generic_write_sync(dio->iocb->ki_filp, offset,
+                                                transferred);
+                       if (err < 0 && ret > 0)
+                               ret = err;
+               }
+
+               aio_complete(dio->iocb, ret, 0);
        }
 
+       kmem_cache_free(dio_cache, dio);
        return ret;
 }
 
+static void dio_aio_complete_work(struct work_struct *work)
+{
+       struct dio *dio = container_of(work, struct dio, complete_work);
+
+       dio_complete(dio, dio->iocb->ki_pos, 0, true);
+}
+
 static int dio_bio_complete(struct dio *dio, struct bio *bio);
+
 /*
  * Asynchronous IO callback. 
  */
@@ -290,8 +311,13 @@ static void dio_bio_end_aio(struct bio *bio, int error)
        spin_unlock_irqrestore(&dio->bio_lock, flags);
 
        if (remaining == 0) {
-               dio_complete(dio, dio->iocb->ki_pos, 0, true);
-               kmem_cache_free(dio_cache, dio);
+               if (dio->result && dio->defer_completion) {
+                       INIT_WORK(&dio->complete_work, dio_aio_complete_work);
+                       queue_work(dio->inode->i_sb->s_dio_done_wq,
+                                  &dio->complete_work);
+               } else {
+                       dio_complete(dio, dio->iocb->ki_pos, 0, true);
+               }
        }
 }
 
@@ -510,6 +536,41 @@ static inline int dio_bio_reap(struct dio *dio, struct dio_submit *sdio)
        return ret;
 }
 
+/*
+ * Create workqueue for deferred direct IO completions. We allocate the
+ * workqueue when it's first needed. This avoids creating workqueue for
+ * filesystems that don't need it and also allows us to create the workqueue
+ * late enough so the we can include s_id in the name of the workqueue.
+ */
+static int sb_init_dio_done_wq(struct super_block *sb)
+{
+       struct workqueue_struct *wq = alloc_workqueue("dio/%s",
+                                                     WQ_MEM_RECLAIM, 0,
+                                                     sb->s_id);
+       if (!wq)
+               return -ENOMEM;
+       /*
+        * This has to be atomic as more DIOs can race to create the workqueue
+        */
+       cmpxchg(&sb->s_dio_done_wq, NULL, wq);
+       /* Someone created workqueue before us? Free ours... */
+       if (wq != sb->s_dio_done_wq)
+               destroy_workqueue(wq);
+       return 0;
+}
+
+static int dio_set_defer_completion(struct dio *dio)
+{
+       struct super_block *sb = dio->inode->i_sb;
+
+       if (dio->defer_completion)
+               return 0;
+       dio->defer_completion = true;
+       if (!sb->s_dio_done_wq)
+               return sb_init_dio_done_wq(sb);
+       return 0;
+}
+
 /*
  * Call into the fs to map some more disk blocks.  We record the current number
  * of available blocks at sdio->blocks_available.  These are in units of the
@@ -581,6 +642,9 @@ static int get_more_blocks(struct dio *dio, struct dio_submit *sdio,
 
                /* Store for completion */
                dio->private = map_bh->b_private;
+
+               if (ret == 0 && buffer_defer_completion(map_bh))
+                       ret = dio_set_defer_completion(dio);
        }
        return ret;
 }
@@ -1128,11 +1192,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
                }
        }
 
-       /*
-        * Will be decremented at I/O completion time.
-        */
-       atomic_inc(&inode->i_dio_count);
-
        /*
         * For file extending writes updating i_size before data
         * writeouts complete can expose uninitialized blocks. So
@@ -1141,11 +1200,33 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
         */
        dio->is_async = !is_sync_kiocb(iocb) && !((rw & WRITE) &&
                (end > i_size_read(inode)));
-
-       retval = 0;
-
        dio->inode = inode;
        dio->rw = rw;
+
+       /*
+        * For AIO O_(D)SYNC writes we need to defer completions to a workqueue
+        * so that we can call ->fsync.
+        */
+       if (dio->is_async && (rw & WRITE) &&
+           ((iocb->ki_filp->f_flags & O_DSYNC) ||
+            IS_SYNC(iocb->ki_filp->f_mapping->host))) {
+               retval = dio_set_defer_completion(dio);
+               if (retval) {
+                       /*
+                        * We grab i_mutex only for reads so we don't have
+                        * to release it here
+                        */
+                       kmem_cache_free(dio_cache, dio);
+                       goto out;
+               }
+       }
+
+       /*
+        * Will be decremented at I/O completion time.
+        */
+       atomic_inc(&inode->i_dio_count);
+
+       retval = 0;
        sdio.blkbits = blkbits;
        sdio.blkfactor = i_blkbits - blkbits;
        sdio.block_in_file = offset >> blkbits;
@@ -1269,7 +1350,6 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
 
        if (drop_refcount(dio) == 0) {
                retval = dio_complete(dio, offset, retval, false);
-               kmem_cache_free(dio_cache, dio);
        } else
                BUG_ON(retval != -EIOCBQUEUED);