]> git.karo-electronics.de Git - linux-beck.git/commitdiff
NFS: make direct write path generate write requests concurrently
authorChuck Lever <cel@netapp.com>
Mon, 20 Mar 2006 18:44:32 +0000 (13:44 -0500)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Mon, 20 Mar 2006 18:44:32 +0000 (13:44 -0500)
Duplicate infrastructure from direct read path that will allow write
path to generate multiple write requests concurrently.  This will
enable us to add support for aio in this path.

Temporarily we will lose the ability to do UNSTABLE writes followed by
a COMMIT in the direct write path.  However, all applications I am
aware of that use NFS O_DIRECT currently write in relatively small
chunks, so this should not be inconvenient in any way.

Test plan:
Millions of fsx-odirect ops. OraSim.

Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
fs/nfs/direct.c
fs/nfs/write.c
include/linux/nfs_fs.h

index 4df21ce28e176b165fff72f3339256a17b2e0252..dea3239cdded28df693c7d90b0c682ca98425bc3 100644 (file)
@@ -384,106 +384,185 @@ static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size
        return result;
 }
 
-static ssize_t nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset, struct page **pages, int nr_pages)
+static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize)
 {
-       const unsigned int wsize = NFS_SERVER(inode)->wsize;
-       size_t request;
-       int curpage, need_commit;
-       ssize_t result, tot_bytes;
-       struct nfs_writeverf first_verf;
-       struct nfs_write_data *wdata;
-
-       wdata = nfs_writedata_alloc(NFS_SERVER(inode)->wpages);
-       if (!wdata)
-               return -ENOMEM;
+       struct list_head *list;
+       struct nfs_direct_req *dreq;
+       unsigned int writes = 0;
+       unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
-       wdata->inode = inode;
-       wdata->cred = ctx->cred;
-       wdata->args.fh = NFS_FH(inode);
-       wdata->args.context = ctx;
-       wdata->args.stable = NFS_UNSTABLE;
-       if (IS_SYNC(inode) || NFS_PROTO(inode)->version == 2 || count <= wsize)
-               wdata->args.stable = NFS_FILE_SYNC;
-       wdata->res.fattr = &wdata->fattr;
-       wdata->res.verf = &wdata->verf;
+       dreq = nfs_direct_req_alloc();
+       if (!dreq)
+               return NULL;
+
+       list = &dreq->list;
+       for(;;) {
+               struct nfs_write_data *data = nfs_writedata_alloc(wpages);
+
+               if (unlikely(!data)) {
+                       while (!list_empty(list)) {
+                               data = list_entry(list->next,
+                                                 struct nfs_write_data, pages);
+                               list_del(&data->pages);
+                               nfs_writedata_free(data);
+                       }
+                       kref_put(&dreq->kref, nfs_direct_req_release);
+                       return NULL;
+               }
+
+               INIT_LIST_HEAD(&data->pages);
+               list_add(&data->pages, list);
+
+               data->req = (struct nfs_page *) dreq;
+               writes++;
+               if (nbytes <= wsize)
+                       break;
+               nbytes -= wsize;
+       }
+       kref_get(&dreq->kref);
+       atomic_set(&dreq->complete, writes);
+       return dreq;
+}
+
+/*
+ * Collects and returns the final error value/byte-count.
+ */
+static ssize_t nfs_direct_write_wait(struct nfs_direct_req *dreq, int intr)
+{
+       int result = 0;
+
+       if (intr) {
+               result = wait_event_interruptible(dreq->wait,
+                                       (atomic_read(&dreq->complete) == 0));
+       } else {
+               wait_event(dreq->wait, (atomic_read(&dreq->complete) == 0));
+       }
+
+       if (!result)
+               result = atomic_read(&dreq->error);
+       if (!result)
+               result = atomic_read(&dreq->count);
+
+       kref_put(&dreq->kref, nfs_direct_req_release);
+       return (ssize_t) result;
+}
+
+static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
+{
+       struct nfs_write_data *data = calldata;
+       struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
+       int status = task->tk_status;
+
+       if (nfs_writeback_done(task, data) != 0)
+               return;
+       /* If the server fell back to an UNSTABLE write, it's an error. */
+       if (unlikely(data->res.verf->committed != NFS_FILE_SYNC))
+               status = -EIO;
+
+       if (likely(status >= 0))
+               atomic_add(data->res.count, &dreq->count);
+       else
+               atomic_set(&dreq->error, status);
+
+       if (unlikely(atomic_dec_and_test(&dreq->complete)))
+               nfs_direct_complete(dreq);
+}
+
+static const struct rpc_call_ops nfs_write_direct_ops = {
+       .rpc_call_done = nfs_direct_write_result,
+       .rpc_release = nfs_writedata_release,
+};
+
+/*
+ * For each nfs_write_data struct that was allocated on the list, dispatch
+ * an NFS WRITE operation
+ *
+ * XXX: For now, support only FILE_SYNC writes.  Later we may add
+ *      support for UNSTABLE + COMMIT.
+ */
+static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset)
+{
+       struct list_head *list = &dreq->list;
+       struct page **pages = dreq->pages;
+       size_t wsize = NFS_SERVER(inode)->wsize;
+       unsigned int curpage, pgbase;
 
-       nfs_begin_data_update(inode);
-retry:
-       need_commit = 0;
-       tot_bytes = 0;
        curpage = 0;
-       request = count;
-       wdata->args.pgbase = user_addr & ~PAGE_MASK;
-       wdata->args.offset = file_offset;
+       pgbase = user_addr & ~PAGE_MASK;
        do {
-               wdata->args.count = request;
-               if (wdata->args.count > wsize)
-                       wdata->args.count = wsize;
-               wdata->args.pages = &pages[curpage];
+               struct nfs_write_data *data;
+               size_t bytes;
+
+               bytes = wsize;
+               if (count < wsize)
+                       bytes = count;
+
+               data = list_entry(list->next, struct nfs_write_data, pages);
+               list_del_init(&data->pages);
+
+               data->inode = inode;
+               data->cred = ctx->cred;
+               data->args.fh = NFS_FH(inode);
+               data->args.context = ctx;
+               data->args.offset = file_offset;
+               data->args.pgbase = pgbase;
+               data->args.pages = &pages[curpage];
+               data->args.count = bytes;
+               data->res.fattr = &data->fattr;
+               data->res.count = bytes;
+
+               rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
+                               &nfs_write_direct_ops, data);
+               NFS_PROTO(inode)->write_setup(data, FLUSH_STABLE);
 
-               dprintk("NFS: direct write: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n",
-                       wdata->args.count, (long long) wdata->args.offset,
-                       user_addr + tot_bytes, wdata->args.pgbase, curpage);
+               data->task.tk_priority = RPC_PRIORITY_NORMAL;
+               data->task.tk_cookie = (unsigned long) inode;
 
                lock_kernel();
-               result = NFS_PROTO(inode)->write(wdata);
+               rpc_execute(&data->task);
                unlock_kernel();
 
-               if (result <= 0) {
-                       if (tot_bytes > 0)
-                               break;
-                       goto out;
-               }
+               dfprintk(VFS, "NFS: %4d initiated direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n",
+                               data->task.tk_pid,
+                               inode->i_sb->s_id,
+                               (long long)NFS_FILEID(inode),
+                               bytes,
+                               (unsigned long long)data->args.offset);
 
-               if (tot_bytes == 0)
-                       memcpy(&first_verf.verifier, &wdata->verf.verifier,
-                                               sizeof(first_verf.verifier));
-               if (wdata->verf.committed != NFS_FILE_SYNC) {
-                       need_commit = 1;
-                       if (memcmp(&first_verf.verifier, &wdata->verf.verifier,
-                                       sizeof(first_verf.verifier)))
-                               goto sync_retry;
-               }
+               file_offset += bytes;
+               pgbase += bytes;
+               curpage += pgbase >> PAGE_SHIFT;
+               pgbase &= ~PAGE_MASK;
 
-               tot_bytes += result;
+               count -= bytes;
+       } while (count != 0);
+}
 
-               /* in case of a short write: stop now, let the app recover */
-               if (result < wdata->args.count)
-                       break;
+static ssize_t nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset, struct page **pages, int nr_pages)
+{
+       ssize_t result;
+       sigset_t oldset;
+       struct rpc_clnt *clnt = NFS_CLIENT(inode);
+       struct nfs_direct_req *dreq;
 
-               wdata->args.offset += result;
-               wdata->args.pgbase += result;
-               curpage += wdata->args.pgbase >> PAGE_SHIFT;
-               wdata->args.pgbase &= ~PAGE_MASK;
-               request -= result;
-       } while (request != 0);
+       dreq = nfs_direct_write_alloc(count, NFS_SERVER(inode)->wsize);
+       if (!dreq)
+               return -ENOMEM;
 
-       /*
-        * Commit data written so far, even in the event of an error
-        */
-       if (need_commit) {
-               wdata->args.count = tot_bytes;
-               wdata->args.offset = file_offset;
+       dreq->pages = pages;
+       dreq->npages = nr_pages;
 
-               lock_kernel();
-               result = NFS_PROTO(inode)->commit(wdata);
-               unlock_kernel();
+       nfs_begin_data_update(inode);
 
-               if (result < 0 || memcmp(&first_verf.verifier,
-                                        &wdata->verf.verifier,
-                                        sizeof(first_verf.verifier)) != 0)
-                       goto sync_retry;
-       }
-       result = tot_bytes;
+       rpc_clnt_sigmask(clnt, &oldset);
+       nfs_direct_write_schedule(dreq, inode, ctx, user_addr, count,
+                                 file_offset);
+       result = nfs_direct_write_wait(dreq, clnt->cl_intr);
+       rpc_clnt_sigunmask(clnt, &oldset);
 
-out:
        nfs_end_data_update(inode);
-       nfs_writedata_free(wdata);
-       return result;
 
-sync_retry:
-       wdata->args.stable = NFS_FILE_SYNC;
-       goto retry;
+       return result;
 }
 
 /*
@@ -515,7 +594,6 @@ static ssize_t nfs_direct_write(struct inode *inode, struct nfs_open_context *ct
                nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, size);
                result = nfs_direct_write_seg(inode, ctx, user_addr, size,
                                file_offset, pages, page_count);
-               nfs_free_user_pages(pages, page_count, 0);
 
                if (result <= 0) {
                        if (tot_bytes > 0)
index 5912274ff1a123ad1f96720ea668e3bde4e32840..875f5b0605338068b0c84045fdf690e0317cd46a 100644 (file)
@@ -77,7 +77,6 @@ static struct nfs_page * nfs_update_request(struct nfs_open_context*,
                                            struct inode *,
                                            struct page *,
                                            unsigned int, unsigned int);
-static int nfs_writeback_done(struct rpc_task *, struct nfs_write_data *);
 static int nfs_wait_on_write_congestion(struct address_space *, int);
 static int nfs_wait_on_requests(struct inode *, unsigned long, unsigned int);
 static int nfs_flush_inode(struct inode *inode, unsigned long idx_start,
@@ -1183,7 +1182,7 @@ static const struct rpc_call_ops nfs_write_full_ops = {
 /*
  * This function is called when the WRITE call is complete.
  */
-static int nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
+int nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
 {
        struct nfs_writeargs    *argp = &data->args;
        struct nfs_writeres     *resp = &data->res;
index f55827be4f8e0f270ef6925aac11a1134dcc47e5..6c130a6b0f4dfbdc46b5bd52634aff4f147f8e36 100644 (file)
@@ -407,6 +407,8 @@ extern int  nfs_writepage(struct page *page, struct writeback_control *wbc);
 extern int  nfs_writepages(struct address_space *, struct writeback_control *);
 extern int  nfs_flush_incompatible(struct file *file, struct page *page);
 extern int  nfs_updatepage(struct file *, struct page *, unsigned int, unsigned int);
+extern int nfs_writeback_done(struct rpc_task *, struct nfs_write_data *);
+extern void nfs_writedata_release(void *);
 
 /*
  * Try to write back everything synchronously (but check the