]> git.karo-electronics.de Git - karo-tx-linux.git/blob - fs/ocfs2/aops.c
ocfs2: teach ocfs2_file_aio_write() about sparse files
[karo-tx-linux.git] / fs / ocfs2 / aops.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2002, 2004 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/slab.h>
24 #include <linux/highmem.h>
25 #include <linux/pagemap.h>
26 #include <asm/byteorder.h>
27 #include <linux/swap.h>
28
29 #define MLOG_MASK_PREFIX ML_FILE_IO
30 #include <cluster/masklog.h>
31
32 #include "ocfs2.h"
33
34 #include "alloc.h"
35 #include "aops.h"
36 #include "dlmglue.h"
37 #include "extent_map.h"
38 #include "file.h"
39 #include "inode.h"
40 #include "journal.h"
41 #include "suballoc.h"
42 #include "super.h"
43 #include "symlink.h"
44
45 #include "buffer_head_io.h"
46
47 static int ocfs2_symlink_get_block(struct inode *inode, sector_t iblock,
48                                    struct buffer_head *bh_result, int create)
49 {
50         int err = -EIO;
51         int status;
52         struct ocfs2_dinode *fe = NULL;
53         struct buffer_head *bh = NULL;
54         struct buffer_head *buffer_cache_bh = NULL;
55         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
56         void *kaddr;
57
58         mlog_entry("(0x%p, %llu, 0x%p, %d)\n", inode,
59                    (unsigned long long)iblock, bh_result, create);
60
61         BUG_ON(ocfs2_inode_is_fast_symlink(inode));
62
63         if ((iblock << inode->i_sb->s_blocksize_bits) > PATH_MAX + 1) {
64                 mlog(ML_ERROR, "block offset > PATH_MAX: %llu",
65                      (unsigned long long)iblock);
66                 goto bail;
67         }
68
69         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
70                                   OCFS2_I(inode)->ip_blkno,
71                                   &bh, OCFS2_BH_CACHED, inode);
72         if (status < 0) {
73                 mlog_errno(status);
74                 goto bail;
75         }
76         fe = (struct ocfs2_dinode *) bh->b_data;
77
78         if (!OCFS2_IS_VALID_DINODE(fe)) {
79                 mlog(ML_ERROR, "Invalid dinode #%llu: signature = %.*s\n",
80                      (unsigned long long)fe->i_blkno, 7, fe->i_signature);
81                 goto bail;
82         }
83
84         if ((u64)iblock >= ocfs2_clusters_to_blocks(inode->i_sb,
85                                                     le32_to_cpu(fe->i_clusters))) {
86                 mlog(ML_ERROR, "block offset is outside the allocated size: "
87                      "%llu\n", (unsigned long long)iblock);
88                 goto bail;
89         }
90
91         /* We don't use the page cache to create symlink data, so if
92          * need be, copy it over from the buffer cache. */
93         if (!buffer_uptodate(bh_result) && ocfs2_inode_is_new(inode)) {
94                 u64 blkno = le64_to_cpu(fe->id2.i_list.l_recs[0].e_blkno) +
95                             iblock;
96                 buffer_cache_bh = sb_getblk(osb->sb, blkno);
97                 if (!buffer_cache_bh) {
98                         mlog(ML_ERROR, "couldn't getblock for symlink!\n");
99                         goto bail;
100                 }
101
102                 /* we haven't locked out transactions, so a commit
103                  * could've happened. Since we've got a reference on
104                  * the bh, even if it commits while we're doing the
105                  * copy, the data is still good. */
106                 if (buffer_jbd(buffer_cache_bh)
107                     && ocfs2_inode_is_new(inode)) {
108                         kaddr = kmap_atomic(bh_result->b_page, KM_USER0);
109                         if (!kaddr) {
110                                 mlog(ML_ERROR, "couldn't kmap!\n");
111                                 goto bail;
112                         }
113                         memcpy(kaddr + (bh_result->b_size * iblock),
114                                buffer_cache_bh->b_data,
115                                bh_result->b_size);
116                         kunmap_atomic(kaddr, KM_USER0);
117                         set_buffer_uptodate(bh_result);
118                 }
119                 brelse(buffer_cache_bh);
120         }
121
122         map_bh(bh_result, inode->i_sb,
123                le64_to_cpu(fe->id2.i_list.l_recs[0].e_blkno) + iblock);
124
125         err = 0;
126
127 bail:
128         if (bh)
129                 brelse(bh);
130
131         mlog_exit(err);
132         return err;
133 }
134
135 static int ocfs2_get_block(struct inode *inode, sector_t iblock,
136                            struct buffer_head *bh_result, int create)
137 {
138         int err = 0;
139         u64 p_blkno, past_eof;
140
141         mlog_entry("(0x%p, %llu, 0x%p, %d)\n", inode,
142                    (unsigned long long)iblock, bh_result, create);
143
144         if (OCFS2_I(inode)->ip_flags & OCFS2_INODE_SYSTEM_FILE)
145                 mlog(ML_NOTICE, "get_block on system inode 0x%p (%lu)\n",
146                      inode, inode->i_ino);
147
148         if (S_ISLNK(inode->i_mode)) {
149                 /* this always does I/O for some reason. */
150                 err = ocfs2_symlink_get_block(inode, iblock, bh_result, create);
151                 goto bail;
152         }
153
154         /* this can happen if another node truncs after our extend! */
155         spin_lock(&OCFS2_I(inode)->ip_lock);
156         if (iblock >= ocfs2_clusters_to_blocks(inode->i_sb,
157                                                OCFS2_I(inode)->ip_clusters))
158                 err = -EIO;
159         spin_unlock(&OCFS2_I(inode)->ip_lock);
160         if (err)
161                 goto bail;
162
163         err = ocfs2_extent_map_get_blocks(inode, iblock, &p_blkno, NULL);
164         if (err) {
165                 mlog(ML_ERROR, "Error %d from get_blocks(0x%p, %llu, 1, "
166                      "%llu, NULL)\n", err, inode, (unsigned long long)iblock,
167                      (unsigned long long)p_blkno);
168                 goto bail;
169         }
170
171         map_bh(bh_result, inode->i_sb, p_blkno);
172
173         if (bh_result->b_blocknr == 0) {
174                 err = -EIO;
175                 mlog(ML_ERROR, "iblock = %llu p_blkno = %llu blkno=(%llu)\n",
176                      (unsigned long long)iblock,
177                      (unsigned long long)p_blkno,
178                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
179         }
180
181         past_eof = ocfs2_blocks_for_bytes(inode->i_sb, i_size_read(inode));
182         mlog(0, "Inode %lu, past_eof = %llu\n", inode->i_ino,
183              (unsigned long long)past_eof);
184
185         if (create && (iblock >= past_eof))
186                 set_buffer_new(bh_result);
187
188 bail:
189         if (err < 0)
190                 err = -EIO;
191
192         mlog_exit(err);
193         return err;
194 }
195
196 static int ocfs2_readpage(struct file *file, struct page *page)
197 {
198         struct inode *inode = page->mapping->host;
199         loff_t start = (loff_t)page->index << PAGE_CACHE_SHIFT;
200         int ret, unlock = 1;
201
202         mlog_entry("(0x%p, %lu)\n", file, (page ? page->index : 0));
203
204         ret = ocfs2_meta_lock_with_page(inode, NULL, 0, page);
205         if (ret != 0) {
206                 if (ret == AOP_TRUNCATED_PAGE)
207                         unlock = 0;
208                 mlog_errno(ret);
209                 goto out;
210         }
211
212         down_read(&OCFS2_I(inode)->ip_alloc_sem);
213
214         /*
215          * i_size might have just been updated as we grabed the meta lock.  We
216          * might now be discovering a truncate that hit on another node.
217          * block_read_full_page->get_block freaks out if it is asked to read
218          * beyond the end of a file, so we check here.  Callers
219          * (generic_file_read, fault->nopage) are clever enough to check i_size
220          * and notice that the page they just read isn't needed.
221          *
222          * XXX sys_readahead() seems to get that wrong?
223          */
224         if (start >= i_size_read(inode)) {
225                 char *addr = kmap(page);
226                 memset(addr, 0, PAGE_SIZE);
227                 flush_dcache_page(page);
228                 kunmap(page);
229                 SetPageUptodate(page);
230                 ret = 0;
231                 goto out_alloc;
232         }
233
234         ret = ocfs2_data_lock_with_page(inode, 0, page);
235         if (ret != 0) {
236                 if (ret == AOP_TRUNCATED_PAGE)
237                         unlock = 0;
238                 mlog_errno(ret);
239                 goto out_alloc;
240         }
241
242         ret = block_read_full_page(page, ocfs2_get_block);
243         unlock = 0;
244
245         ocfs2_data_unlock(inode, 0);
246 out_alloc:
247         up_read(&OCFS2_I(inode)->ip_alloc_sem);
248         ocfs2_meta_unlock(inode, 0);
249 out:
250         if (unlock)
251                 unlock_page(page);
252         mlog_exit(ret);
253         return ret;
254 }
255
256 /* Note: Because we don't support holes, our allocation has
257  * already happened (allocation writes zeros to the file data)
258  * so we don't have to worry about ordered writes in
259  * ocfs2_writepage.
260  *
261  * ->writepage is called during the process of invalidating the page cache
262  * during blocked lock processing.  It can't block on any cluster locks
263  * to during block mapping.  It's relying on the fact that the block
264  * mapping can't have disappeared under the dirty pages that it is
265  * being asked to write back.
266  */
267 static int ocfs2_writepage(struct page *page, struct writeback_control *wbc)
268 {
269         int ret;
270
271         mlog_entry("(0x%p)\n", page);
272
273         ret = block_write_full_page(page, ocfs2_get_block, wbc);
274
275         mlog_exit(ret);
276
277         return ret;
278 }
279
280 /* This can also be called from ocfs2_write_zero_page() which has done
281  * it's own cluster locking. */
282 int ocfs2_prepare_write_nolock(struct inode *inode, struct page *page,
283                                unsigned from, unsigned to)
284 {
285         int ret;
286
287         down_read(&OCFS2_I(inode)->ip_alloc_sem);
288
289         ret = block_prepare_write(page, from, to, ocfs2_get_block);
290
291         up_read(&OCFS2_I(inode)->ip_alloc_sem);
292
293         return ret;
294 }
295
296 /*
297  * ocfs2_prepare_write() can be an outer-most ocfs2 call when it is called
298  * from loopback.  It must be able to perform its own locking around
299  * ocfs2_get_block().
300  */
301 static int ocfs2_prepare_write(struct file *file, struct page *page,
302                                unsigned from, unsigned to)
303 {
304         struct inode *inode = page->mapping->host;
305         int ret;
306
307         mlog_entry("(0x%p, 0x%p, %u, %u)\n", file, page, from, to);
308
309         ret = ocfs2_meta_lock_with_page(inode, NULL, 0, page);
310         if (ret != 0) {
311                 mlog_errno(ret);
312                 goto out;
313         }
314
315         ret = ocfs2_prepare_write_nolock(inode, page, from, to);
316
317         ocfs2_meta_unlock(inode, 0);
318 out:
319         mlog_exit(ret);
320         return ret;
321 }
322
323 /* Taken from ext3. We don't necessarily need the full blown
324  * functionality yet, but IMHO it's better to cut and paste the whole
325  * thing so we can avoid introducing our own bugs (and easily pick up
326  * their fixes when they happen) --Mark */
327 static int walk_page_buffers(   handle_t *handle,
328                                 struct buffer_head *head,
329                                 unsigned from,
330                                 unsigned to,
331                                 int *partial,
332                                 int (*fn)(      handle_t *handle,
333                                                 struct buffer_head *bh))
334 {
335         struct buffer_head *bh;
336         unsigned block_start, block_end;
337         unsigned blocksize = head->b_size;
338         int err, ret = 0;
339         struct buffer_head *next;
340
341         for (   bh = head, block_start = 0;
342                 ret == 0 && (bh != head || !block_start);
343                 block_start = block_end, bh = next)
344         {
345                 next = bh->b_this_page;
346                 block_end = block_start + blocksize;
347                 if (block_end <= from || block_start >= to) {
348                         if (partial && !buffer_uptodate(bh))
349                                 *partial = 1;
350                         continue;
351                 }
352                 err = (*fn)(handle, bh);
353                 if (!ret)
354                         ret = err;
355         }
356         return ret;
357 }
358
359 handle_t *ocfs2_start_walk_page_trans(struct inode *inode,
360                                                          struct page *page,
361                                                          unsigned from,
362                                                          unsigned to)
363 {
364         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
365         handle_t *handle = NULL;
366         int ret = 0;
367
368         handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
369         if (!handle) {
370                 ret = -ENOMEM;
371                 mlog_errno(ret);
372                 goto out;
373         }
374
375         if (ocfs2_should_order_data(inode)) {
376                 ret = walk_page_buffers(handle,
377                                         page_buffers(page),
378                                         from, to, NULL,
379                                         ocfs2_journal_dirty_data);
380                 if (ret < 0) 
381                         mlog_errno(ret);
382         }
383 out:
384         if (ret) {
385                 if (handle)
386                         ocfs2_commit_trans(osb, handle);
387                 handle = ERR_PTR(ret);
388         }
389         return handle;
390 }
391
392 static int ocfs2_commit_write(struct file *file, struct page *page,
393                               unsigned from, unsigned to)
394 {
395         int ret;
396         struct buffer_head *di_bh = NULL;
397         struct inode *inode = page->mapping->host;
398         handle_t *handle = NULL;
399         struct ocfs2_dinode *di;
400
401         mlog_entry("(0x%p, 0x%p, %u, %u)\n", file, page, from, to);
402
403         /* NOTE: ocfs2_file_aio_write has ensured that it's safe for
404          * us to continue here without rechecking the I/O against
405          * changed inode values.
406          *
407          * 1) We're currently holding the inode alloc lock, so no
408          *    nodes can change it underneath us.
409          *
410          * 2) We've had to take the metadata lock at least once
411          *    already to check for extending writes, suid removal, etc.
412          *    The meta data update code then ensures that we don't get a
413          *    stale inode allocation image (i_size, i_clusters, etc).
414          */
415
416         ret = ocfs2_meta_lock_with_page(inode, &di_bh, 1, page);
417         if (ret != 0) {
418                 mlog_errno(ret);
419                 goto out;
420         }
421
422         ret = ocfs2_data_lock_with_page(inode, 1, page);
423         if (ret != 0) {
424                 mlog_errno(ret);
425                 goto out_unlock_meta;
426         }
427
428         handle = ocfs2_start_walk_page_trans(inode, page, from, to);
429         if (IS_ERR(handle)) {
430                 ret = PTR_ERR(handle);
431                 goto out_unlock_data;
432         }
433
434         /* Mark our buffer early. We'd rather catch this error up here
435          * as opposed to after a successful commit_write which would
436          * require us to set back inode->i_size. */
437         ret = ocfs2_journal_access(handle, inode, di_bh,
438                                    OCFS2_JOURNAL_ACCESS_WRITE);
439         if (ret < 0) {
440                 mlog_errno(ret);
441                 goto out_commit;
442         }
443
444         /* might update i_size */
445         ret = generic_commit_write(file, page, from, to);
446         if (ret < 0) {
447                 mlog_errno(ret);
448                 goto out_commit;
449         }
450
451         di = (struct ocfs2_dinode *)di_bh->b_data;
452
453         /* ocfs2_mark_inode_dirty() is too heavy to use here. */
454         inode->i_mtime = inode->i_ctime = CURRENT_TIME;
455         di->i_mtime = di->i_ctime = cpu_to_le64(inode->i_mtime.tv_sec);
456         di->i_mtime_nsec = di->i_ctime_nsec = cpu_to_le32(inode->i_mtime.tv_nsec);
457
458         inode->i_blocks = ocfs2_align_bytes_to_sectors((u64)(i_size_read(inode)));
459         di->i_size = cpu_to_le64((u64)i_size_read(inode));
460
461         ret = ocfs2_journal_dirty(handle, di_bh);
462         if (ret < 0) {
463                 mlog_errno(ret);
464                 goto out_commit;
465         }
466
467 out_commit:
468         ocfs2_commit_trans(OCFS2_SB(inode->i_sb), handle);
469 out_unlock_data:
470         ocfs2_data_unlock(inode, 1);
471 out_unlock_meta:
472         ocfs2_meta_unlock(inode, 1);
473 out:
474         if (di_bh)
475                 brelse(di_bh);
476
477         mlog_exit(ret);
478         return ret;
479 }
480
481 static sector_t ocfs2_bmap(struct address_space *mapping, sector_t block)
482 {
483         sector_t status;
484         u64 p_blkno = 0;
485         int err = 0;
486         struct inode *inode = mapping->host;
487
488         mlog_entry("(block = %llu)\n", (unsigned long long)block);
489
490         /* We don't need to lock journal system files, since they aren't
491          * accessed concurrently from multiple nodes.
492          */
493         if (!INODE_JOURNAL(inode)) {
494                 err = ocfs2_meta_lock(inode, NULL, 0);
495                 if (err) {
496                         if (err != -ENOENT)
497                                 mlog_errno(err);
498                         goto bail;
499                 }
500                 down_read(&OCFS2_I(inode)->ip_alloc_sem);
501         }
502
503         err = ocfs2_extent_map_get_blocks(inode, block, &p_blkno, NULL);
504
505         if (!INODE_JOURNAL(inode)) {
506                 up_read(&OCFS2_I(inode)->ip_alloc_sem);
507                 ocfs2_meta_unlock(inode, 0);
508         }
509
510         if (err) {
511                 mlog(ML_ERROR, "get_blocks() failed, block = %llu\n",
512                      (unsigned long long)block);
513                 mlog_errno(err);
514                 goto bail;
515         }
516
517
518 bail:
519         status = err ? 0 : p_blkno;
520
521         mlog_exit((int)status);
522
523         return status;
524 }
525
526 /*
527  * TODO: Make this into a generic get_blocks function.
528  *
529  * From do_direct_io in direct-io.c:
530  *  "So what we do is to permit the ->get_blocks function to populate
531  *   bh.b_size with the size of IO which is permitted at this offset and
532  *   this i_blkbits."
533  *
534  * This function is called directly from get_more_blocks in direct-io.c.
535  *
536  * called like this: dio->get_blocks(dio->inode, fs_startblk,
537  *                                      fs_count, map_bh, dio->rw == WRITE);
538  */
539 static int ocfs2_direct_IO_get_blocks(struct inode *inode, sector_t iblock,
540                                      struct buffer_head *bh_result, int create)
541 {
542         int ret;
543         u64 p_blkno, inode_blocks;
544         int contig_blocks;
545         unsigned char blocksize_bits = inode->i_sb->s_blocksize_bits;
546         unsigned long max_blocks = bh_result->b_size >> inode->i_blkbits;
547
548         /* This function won't even be called if the request isn't all
549          * nicely aligned and of the right size, so there's no need
550          * for us to check any of that. */
551
552         spin_lock(&OCFS2_I(inode)->ip_lock);
553         inode_blocks = ocfs2_clusters_to_blocks(inode->i_sb,
554                                                 OCFS2_I(inode)->ip_clusters);
555
556         /*
557          * For a read which begins past the end of file, we return a hole.
558          */
559         if (!create && (iblock >= inode_blocks)) {
560                 spin_unlock(&OCFS2_I(inode)->ip_lock);
561                 ret = 0;
562                 goto bail;
563         }
564
565         /*
566          * Any write past EOF is not allowed because we'd be extending.
567          */
568         if (create && (iblock + max_blocks) > inode_blocks) {
569                 spin_unlock(&OCFS2_I(inode)->ip_lock);
570                 ret = -EIO;
571                 goto bail;
572         }
573         spin_unlock(&OCFS2_I(inode)->ip_lock);
574
575         /* This figures out the size of the next contiguous block, and
576          * our logical offset */
577         ret = ocfs2_extent_map_get_blocks(inode, iblock, &p_blkno,
578                                           &contig_blocks);
579         if (ret) {
580                 mlog(ML_ERROR, "get_blocks() failed iblock=%llu\n",
581                      (unsigned long long)iblock);
582                 ret = -EIO;
583                 goto bail;
584         }
585
586         map_bh(bh_result, inode->i_sb, p_blkno);
587
588         /* make sure we don't map more than max_blocks blocks here as
589            that's all the kernel will handle at this point. */
590         if (max_blocks < contig_blocks)
591                 contig_blocks = max_blocks;
592         bh_result->b_size = contig_blocks << blocksize_bits;
593 bail:
594         return ret;
595 }
596
597 /* 
598  * ocfs2_dio_end_io is called by the dio core when a dio is finished.  We're
599  * particularly interested in the aio/dio case.  Like the core uses
600  * i_alloc_sem, we use the rw_lock DLM lock to protect io on one node from
601  * truncation on another.
602  */
603 static void ocfs2_dio_end_io(struct kiocb *iocb,
604                              loff_t offset,
605                              ssize_t bytes,
606                              void *private)
607 {
608         struct inode *inode = iocb->ki_filp->f_path.dentry->d_inode;
609
610         /* this io's submitter should not have unlocked this before we could */
611         BUG_ON(!ocfs2_iocb_is_rw_locked(iocb));
612         ocfs2_iocb_clear_rw_locked(iocb);
613         up_read(&inode->i_alloc_sem);
614         ocfs2_rw_unlock(inode, 0);
615 }
616
617 /*
618  * ocfs2_invalidatepage() and ocfs2_releasepage() are shamelessly stolen
619  * from ext3.  PageChecked() bits have been removed as OCFS2 does not
620  * do journalled data.
621  */
622 static void ocfs2_invalidatepage(struct page *page, unsigned long offset)
623 {
624         journal_t *journal = OCFS2_SB(page->mapping->host->i_sb)->journal->j_journal;
625
626         journal_invalidatepage(journal, page, offset);
627 }
628
629 static int ocfs2_releasepage(struct page *page, gfp_t wait)
630 {
631         journal_t *journal = OCFS2_SB(page->mapping->host->i_sb)->journal->j_journal;
632
633         if (!page_has_buffers(page))
634                 return 0;
635         return journal_try_to_free_buffers(journal, page, wait);
636 }
637
638 static ssize_t ocfs2_direct_IO(int rw,
639                                struct kiocb *iocb,
640                                const struct iovec *iov,
641                                loff_t offset,
642                                unsigned long nr_segs)
643 {
644         struct file *file = iocb->ki_filp;
645         struct inode *inode = file->f_path.dentry->d_inode->i_mapping->host;
646         int ret;
647
648         mlog_entry_void();
649
650         if (!ocfs2_sparse_alloc(OCFS2_SB(inode->i_sb))) {
651                 /*
652                  * We get PR data locks even for O_DIRECT.  This
653                  * allows concurrent O_DIRECT I/O but doesn't let
654                  * O_DIRECT with extending and buffered zeroing writes
655                  * race.  If they did race then the buffered zeroing
656                  * could be written back after the O_DIRECT I/O.  It's
657                  * one thing to tell people not to mix buffered and
658                  * O_DIRECT writes, but expecting them to understand
659                  * that file extension is also an implicit buffered
660                  * write is too much.  By getting the PR we force
661                  * writeback of the buffered zeroing before
662                  * proceeding.
663                  */
664                 ret = ocfs2_data_lock(inode, 0);
665                 if (ret < 0) {
666                         mlog_errno(ret);
667                         goto out;
668                 }
669                 ocfs2_data_unlock(inode, 0);
670         }
671
672         ret = blockdev_direct_IO_no_locking(rw, iocb, inode,
673                                             inode->i_sb->s_bdev, iov, offset,
674                                             nr_segs, 
675                                             ocfs2_direct_IO_get_blocks,
676                                             ocfs2_dio_end_io);
677 out:
678         mlog_exit(ret);
679         return ret;
680 }
681
682 static void ocfs2_figure_cluster_boundaries(struct ocfs2_super *osb,
683                                             u32 cpos,
684                                             unsigned int *start,
685                                             unsigned int *end)
686 {
687         unsigned int cluster_start = 0, cluster_end = PAGE_CACHE_SIZE;
688
689         if (unlikely(PAGE_CACHE_SHIFT > osb->s_clustersize_bits)) {
690                 unsigned int cpp;
691
692                 cpp = 1 << (PAGE_CACHE_SHIFT - osb->s_clustersize_bits);
693
694                 cluster_start = cpos % cpp;
695                 cluster_start = cluster_start << osb->s_clustersize_bits;
696
697                 cluster_end = cluster_start + osb->s_clustersize;
698         }
699
700         BUG_ON(cluster_start > PAGE_SIZE);
701         BUG_ON(cluster_end > PAGE_SIZE);
702
703         if (start)
704                 *start = cluster_start;
705         if (end)
706                 *end = cluster_end;
707 }
708
709 /*
710  * 'from' and 'to' are the region in the page to avoid zeroing.
711  *
712  * If pagesize > clustersize, this function will avoid zeroing outside
713  * of the cluster boundary.
714  *
715  * from == to == 0 is code for "zero the entire cluster region"
716  */
717 static void ocfs2_clear_page_regions(struct page *page,
718                                      struct ocfs2_super *osb, u32 cpos,
719                                      unsigned from, unsigned to)
720 {
721         void *kaddr;
722         unsigned int cluster_start, cluster_end;
723
724         ocfs2_figure_cluster_boundaries(osb, cpos, &cluster_start, &cluster_end);
725
726         kaddr = kmap_atomic(page, KM_USER0);
727
728         if (from || to) {
729                 if (from > cluster_start)
730                         memset(kaddr + cluster_start, 0, from - cluster_start);
731                 if (to < cluster_end)
732                         memset(kaddr + to, 0, cluster_end - to);
733         } else {
734                 memset(kaddr + cluster_start, 0, cluster_end - cluster_start);
735         }
736
737         kunmap_atomic(kaddr, KM_USER0);
738 }
739
740 /*
741  * Some of this taken from block_prepare_write(). We already have our
742  * mapping by now though, and the entire write will be allocating or
743  * it won't, so not much need to use BH_New.
744  *
745  * This will also skip zeroing, which is handled externally.
746  */
747 static int ocfs2_map_page_blocks(struct page *page, u64 *p_blkno,
748                                  struct inode *inode, unsigned int from,
749                                  unsigned int to, int new)
750 {
751         int ret = 0;
752         struct buffer_head *head, *bh, *wait[2], **wait_bh = wait;
753         unsigned int block_end, block_start;
754         unsigned int bsize = 1 << inode->i_blkbits;
755
756         if (!page_has_buffers(page))
757                 create_empty_buffers(page, bsize, 0);
758
759         head = page_buffers(page);
760         for (bh = head, block_start = 0; bh != head || !block_start;
761              bh = bh->b_this_page, block_start += bsize) {
762                 block_end = block_start + bsize;
763
764                 /*
765                  * Ignore blocks outside of our i/o range -
766                  * they may belong to unallocated clusters.
767                  */
768                 if (block_start >= to ||
769                     (block_start + bsize) <= from) {
770                         if (PageUptodate(page))
771                                 set_buffer_uptodate(bh);
772                         continue;
773                 }
774
775                 /*
776                  * For an allocating write with cluster size >= page
777                  * size, we always write the entire page.
778                  */
779
780                 if (buffer_new(bh))
781                         clear_buffer_new(bh);
782
783                 if (!buffer_mapped(bh)) {
784                         map_bh(bh, inode->i_sb, *p_blkno);
785                         unmap_underlying_metadata(bh->b_bdev, bh->b_blocknr);
786                 }
787
788                 if (PageUptodate(page)) {
789                         if (!buffer_uptodate(bh))
790                                 set_buffer_uptodate(bh);
791                 } else if (!buffer_uptodate(bh) && !buffer_delay(bh) &&
792                      (block_start < from || block_end > to)) {
793                         ll_rw_block(READ, 1, &bh);
794                         *wait_bh++=bh;
795                 }
796
797                 *p_blkno = *p_blkno + 1;
798         }
799
800         /*
801          * If we issued read requests - let them complete.
802          */
803         while(wait_bh > wait) {
804                 wait_on_buffer(*--wait_bh);
805                 if (!buffer_uptodate(*wait_bh))
806                         ret = -EIO;
807         }
808
809         if (ret == 0 || !new)
810                 return ret;
811
812         /*
813          * If we get -EIO above, zero out any newly allocated blocks
814          * to avoid exposing stale data.
815          */
816         bh = head;
817         block_start = 0;
818         do {
819                 void *kaddr;
820
821                 block_end = block_start + bsize;
822                 if (block_end <= from)
823                         goto next_bh;
824                 if (block_start >= to)
825                         break;
826
827                 kaddr = kmap_atomic(page, KM_USER0);
828                 memset(kaddr+block_start, 0, bh->b_size);
829                 flush_dcache_page(page);
830                 kunmap_atomic(kaddr, KM_USER0);
831                 set_buffer_uptodate(bh);
832                 mark_buffer_dirty(bh);
833
834 next_bh:
835                 block_start = block_end;
836                 bh = bh->b_this_page;
837         } while (bh != head);
838
839         return ret;
840 }
841
842 /*
843  * This will copy user data from the iovec in the buffered write
844  * context.
845  */
846 int ocfs2_map_and_write_user_data(struct inode *inode,
847                                   struct ocfs2_write_ctxt *wc, u64 *p_blkno,
848                                   unsigned int *ret_from, unsigned int *ret_to)
849 {
850         int ret;
851         unsigned int to, from, cluster_start, cluster_end;
852         unsigned long bytes, src_from;
853         char *dst;
854         struct ocfs2_buffered_write_priv *bp = wc->w_private;
855         const struct iovec *cur_iov = bp->b_cur_iov;
856         char __user *buf;
857         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
858
859         ocfs2_figure_cluster_boundaries(osb, wc->w_cpos, &cluster_start,
860                                         &cluster_end);
861
862         buf = cur_iov->iov_base + bp->b_cur_off;
863         src_from = (unsigned long)buf & ~PAGE_CACHE_MASK;
864
865         from = wc->w_pos & (PAGE_CACHE_SIZE - 1);
866
867         /*
868          * This is a lot of comparisons, but it reads quite
869          * easily, which is important here.
870          */
871         /* Stay within the src page */
872         bytes = PAGE_SIZE - src_from;
873         /* Stay within the vector */
874         bytes = min(bytes,
875                     (unsigned long)(cur_iov->iov_len - bp->b_cur_off));
876         /* Stay within count */
877         bytes = min(bytes, (unsigned long)wc->w_count);
878         /*
879          * For clustersize > page size, just stay within
880          * target page, otherwise we have to calculate pos
881          * within the cluster and obey the rightmost
882          * boundary.
883          */
884         if (wc->w_large_pages) {
885                 /*
886                  * For cluster size < page size, we have to
887                  * calculate pos within the cluster and obey
888                  * the rightmost boundary.
889                  */
890                 bytes = min(bytes, (unsigned long)(osb->s_clustersize
891                                    - (wc->w_pos & (osb->s_clustersize - 1))));
892         } else {
893                 /*
894                  * cluster size > page size is the most common
895                  * case - we just stay within the target page
896                  * boundary.
897                  */
898                 bytes = min(bytes, PAGE_CACHE_SIZE - from);
899         }
900
901         to = from + bytes;
902
903         if (wc->w_this_page_new)
904                 ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
905                                             cluster_start, cluster_end, 1);
906         else
907                 ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
908                                             from, to, 0);
909         if (ret) {
910                 mlog_errno(ret);
911                 goto out;
912         }
913
914         BUG_ON(from > PAGE_CACHE_SIZE);
915         BUG_ON(to > PAGE_CACHE_SIZE);
916         BUG_ON(from > osb->s_clustersize);
917         BUG_ON(to > osb->s_clustersize);
918
919         dst = kmap(wc->w_this_page);
920         memcpy(dst + from, bp->b_src_buf + src_from, bytes);
921         kunmap(wc->w_this_page);
922
923         /*
924          * XXX: This is slow, but simple. The caller of
925          * ocfs2_buffered_write_cluster() is responsible for
926          * passing through the iovecs, so it's difficult to
927          * predict what our next step is in here after our
928          * initial write. A future version should be pushing
929          * that iovec manipulation further down.
930          *
931          * By setting this, we indicate that a copy from user
932          * data was done, and subsequent calls for this
933          * cluster will skip copying more data.
934          */
935         wc->w_finished_copy = 1;
936
937         *ret_from = from;
938         *ret_to = to;
939 out:
940
941         return bytes ? (unsigned int)bytes : ret;
942 }
943
944 /*
945  * Map, fill and write a page to disk.
946  *
947  * The work of copying data is done via callback.  Newly allocated
948  * pages which don't take user data will be zero'd (set 'new' to
949  * indicate an allocating write)
950  *
951  * Returns a negative error code or the number of bytes copied into
952  * the page.
953  */
954 int ocfs2_write_data_page(struct inode *inode, handle_t *handle,
955                           u64 *p_blkno, struct page *page,
956                           struct ocfs2_write_ctxt *wc, int new)
957 {
958         int ret, copied = 0;
959         unsigned int from = 0, to = 0;
960         unsigned int cluster_start, cluster_end;
961         unsigned int zero_from = 0, zero_to = 0;
962
963         ocfs2_figure_cluster_boundaries(OCFS2_SB(inode->i_sb), wc->w_cpos,
964                                         &cluster_start, &cluster_end);
965
966         if ((wc->w_pos >> PAGE_CACHE_SHIFT) == page->index
967             && !wc->w_finished_copy) {
968
969                 wc->w_this_page = page;
970                 wc->w_this_page_new = new;
971                 ret = wc->w_write_data_page(inode, wc, p_blkno, &from, &to);
972                 if (ret < 0) {
973                         mlog_errno(ret);
974                         goto out;
975                 }
976
977                 copied = ret;
978
979                 zero_from = from;
980                 zero_to = to;
981                 if (new) {
982                         from = cluster_start;
983                         to = cluster_end;
984                 }
985         } else {
986                 /*
987                  * If we haven't allocated the new page yet, we
988                  * shouldn't be writing it out without copying user
989                  * data. This is likely a math error from the caller.
990                  */
991                 BUG_ON(!new);
992
993                 from = cluster_start;
994                 to = cluster_end;
995
996                 ret = ocfs2_map_page_blocks(page, p_blkno, inode,
997                                             cluster_start, cluster_end, 1);
998                 if (ret) {
999                         mlog_errno(ret);
1000                         goto out;
1001                 }
1002         }
1003
1004         /*
1005          * Parts of newly allocated pages need to be zero'd.
1006          *
1007          * Above, we have also rewritten 'to' and 'from' - as far as
1008          * the rest of the function is concerned, the entire cluster
1009          * range inside of a page needs to be written.
1010          *
1011          * We can skip this if the page is up to date - it's already
1012          * been zero'd from being read in as a hole.
1013          */
1014         if (new && !PageUptodate(page))
1015                 ocfs2_clear_page_regions(page, OCFS2_SB(inode->i_sb),
1016                                          wc->w_cpos, zero_from, zero_to);
1017
1018         flush_dcache_page(page);
1019
1020         if (ocfs2_should_order_data(inode)) {
1021                 ret = walk_page_buffers(handle,
1022                                         page_buffers(page),
1023                                         from, to, NULL,
1024                                         ocfs2_journal_dirty_data);
1025                 if (ret < 0)
1026                         mlog_errno(ret);
1027         }
1028
1029         /*
1030          * We don't use generic_commit_write() because we need to
1031          * handle our own i_size update.
1032          */
1033         ret = block_commit_write(page, from, to);
1034         if (ret)
1035                 mlog_errno(ret);
1036 out:
1037
1038         return copied ? copied : ret;
1039 }
1040
1041 /*
1042  * Do the actual write of some data into an inode. Optionally allocate
1043  * in order to fulfill the write.
1044  *
1045  * cpos is the logical cluster offset within the file to write at
1046  *
1047  * 'phys' is the physical mapping of that offset. a 'phys' value of
1048  * zero indicates that allocation is required. In this case, data_ac
1049  * and meta_ac should be valid (meta_ac can be null if metadata
1050  * allocation isn't required).
1051  */
1052 static ssize_t ocfs2_write(struct file *file, u32 phys, handle_t *handle,
1053                            struct buffer_head *di_bh,
1054                            struct ocfs2_alloc_context *data_ac,
1055                            struct ocfs2_alloc_context *meta_ac,
1056                            struct ocfs2_write_ctxt *wc)
1057 {
1058         int ret, i, numpages = 1, new;
1059         unsigned int copied = 0;
1060         u32 tmp_pos;
1061         u64 v_blkno, p_blkno;
1062         struct address_space *mapping = file->f_mapping;
1063         struct inode *inode = mapping->host;
1064         unsigned int cbits = OCFS2_SB(inode->i_sb)->s_clustersize_bits;
1065         unsigned long index, start;
1066         struct page **cpages;
1067
1068         new = phys == 0 ? 1 : 0;
1069
1070         /*
1071          * Figure out how many pages we'll be manipulating here. For
1072          * non-allocating write, or any writes where cluster size is
1073          * less than page size, we only need one page. Otherwise,
1074          * allocating writes of cluster size larger than page size
1075          * need cluster size pages.
1076          */
1077         if (new && !wc->w_large_pages)
1078                 numpages = (1 << cbits) / PAGE_SIZE;
1079
1080         cpages = kzalloc(sizeof(*cpages) * numpages, GFP_NOFS);
1081         if (!cpages) {
1082                 ret = -ENOMEM;
1083                 mlog_errno(ret);
1084                 return ret;
1085         }
1086
1087         /*
1088          * Fill our page array first. That way we've grabbed enough so
1089          * that we can zero and flush if we error after adding the
1090          * extent.
1091          */
1092         if (new) {
1093                 start = ocfs2_align_clusters_to_page_index(inode->i_sb,
1094                                                            wc->w_cpos);
1095                 v_blkno = ocfs2_clusters_to_blocks(inode->i_sb, wc->w_cpos);
1096         } else {
1097                 start = wc->w_pos >> PAGE_CACHE_SHIFT;
1098                 v_blkno = wc->w_pos >> inode->i_sb->s_blocksize_bits;
1099         }
1100
1101         for(i = 0; i < numpages; i++) {
1102                 index = start + i;
1103
1104                 cpages[i] = grab_cache_page(mapping, index);
1105                 if (!cpages[i]) {
1106                         ret = -ENOMEM;
1107                         mlog_errno(ret);
1108                         goto out;
1109                 }
1110         }
1111
1112         if (new) {
1113                 /*
1114                  * This is safe to call with the page locks - it won't take
1115                  * any additional semaphores or cluster locks.
1116                  */
1117                 tmp_pos = wc->w_cpos;
1118                 ret = ocfs2_do_extend_allocation(OCFS2_SB(inode->i_sb), inode,
1119                                                  &tmp_pos, 1, di_bh, handle,
1120                                                  data_ac, meta_ac, NULL);
1121                 /*
1122                  * This shouldn't happen because we must have already
1123                  * calculated the correct meta data allocation required. The
1124                  * internal tree allocation code should know how to increase
1125                  * transaction credits itself.
1126                  *
1127                  * If need be, we could handle -EAGAIN for a
1128                  * RESTART_TRANS here.
1129                  */
1130                 mlog_bug_on_msg(ret == -EAGAIN,
1131                                 "Inode %llu: EAGAIN return during allocation.\n",
1132                                 (unsigned long long)OCFS2_I(inode)->ip_blkno);
1133                 if (ret < 0) {
1134                         mlog_errno(ret);
1135                         goto out;
1136                 }
1137         }
1138
1139         ret = ocfs2_extent_map_get_blocks(inode, v_blkno, &p_blkno, NULL);
1140         if (ret < 0) {
1141
1142                 /*
1143                  * XXX: Should we go readonly here?
1144                  */
1145
1146                 mlog_errno(ret);
1147                 goto out;
1148         }
1149
1150         BUG_ON(p_blkno == 0);
1151
1152         for(i = 0; i < numpages; i++) {
1153                 ret = ocfs2_write_data_page(inode, handle, &p_blkno, cpages[i],
1154                                             wc, new);
1155                 if (ret < 0) {
1156                         mlog_errno(ret);
1157                         goto out;
1158                 }
1159
1160                 copied += ret;
1161         }
1162
1163 out:
1164         for(i = 0; i < numpages; i++) {
1165                 unlock_page(cpages[i]);
1166                 mark_page_accessed(cpages[i]);
1167                 page_cache_release(cpages[i]);
1168         }
1169         kfree(cpages);
1170
1171         return copied ? copied : ret;
1172 }
1173
1174 static void ocfs2_write_ctxt_init(struct ocfs2_write_ctxt *wc,
1175                                   struct ocfs2_super *osb, loff_t pos,
1176                                   size_t count, ocfs2_page_writer *cb,
1177                                   void *cb_priv)
1178 {
1179         wc->w_count = count;
1180         wc->w_pos = pos;
1181         wc->w_cpos = wc->w_pos >> osb->s_clustersize_bits;
1182         wc->w_finished_copy = 0;
1183
1184         if (unlikely(PAGE_CACHE_SHIFT > osb->s_clustersize_bits))
1185                 wc->w_large_pages = 1;
1186         else
1187                 wc->w_large_pages = 0;
1188
1189         wc->w_write_data_page = cb;
1190         wc->w_private = cb_priv;
1191 }
1192
1193 /*
1194  * Write a cluster to an inode. The cluster may not be allocated yet,
1195  * in which case it will be. This only exists for buffered writes -
1196  * O_DIRECT takes a more "traditional" path through the kernel.
1197  *
1198  * The caller is responsible for incrementing pos, written counts, etc
1199  *
1200  * For file systems that don't support sparse files, pre-allocation
1201  * and page zeroing up until cpos should be done prior to this
1202  * function call.
1203  *
1204  * Callers should be holding i_sem, and the rw cluster lock.
1205  *
1206  * Returns the number of user bytes written, or less than zero for
1207  * error.
1208  */
1209 ssize_t ocfs2_buffered_write_cluster(struct file *file, loff_t pos,
1210                                      size_t count, ocfs2_page_writer *actor,
1211                                      void *priv)
1212 {
1213         int ret, credits = OCFS2_INODE_UPDATE_CREDITS;
1214         ssize_t written = 0;
1215         u32 phys;
1216         struct inode *inode = file->f_mapping->host;
1217         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1218         struct buffer_head *di_bh = NULL;
1219         struct ocfs2_dinode *di;
1220         struct ocfs2_alloc_context *data_ac = NULL;
1221         struct ocfs2_alloc_context *meta_ac = NULL;
1222         handle_t *handle;
1223         struct ocfs2_write_ctxt wc;
1224
1225         ocfs2_write_ctxt_init(&wc, osb, pos, count, actor, priv);
1226
1227         ret = ocfs2_meta_lock(inode, &di_bh, 1);
1228         if (ret) {
1229                 mlog_errno(ret);
1230                 goto out;
1231         }
1232         di = (struct ocfs2_dinode *)di_bh->b_data;
1233
1234         /*
1235          * Take alloc sem here to prevent concurrent lookups. That way
1236          * the mapping, zeroing and tree manipulation within
1237          * ocfs2_write() will be safe against ->readpage(). This
1238          * should also serve to lock out allocation from a shared
1239          * writeable region.
1240          */
1241         down_write(&OCFS2_I(inode)->ip_alloc_sem);
1242
1243         ret = ocfs2_get_clusters(inode, wc.w_cpos, &phys, NULL);
1244         if (ret) {
1245                 mlog_errno(ret);
1246                 goto out_meta;
1247         }
1248
1249         /* phys == 0 means that allocation is required. */
1250         if (phys == 0) {
1251                 ret = ocfs2_lock_allocators(inode, di, 1, &data_ac, &meta_ac);
1252                 if (ret) {
1253                         mlog_errno(ret);
1254                         goto out_meta;
1255                 }
1256
1257                 credits = ocfs2_calc_extend_credits(inode->i_sb, di, 1);
1258         }
1259
1260         ret = ocfs2_data_lock(inode, 1);
1261         if (ret) {
1262                 mlog_errno(ret);
1263                 goto out_meta;
1264         }
1265
1266         handle = ocfs2_start_trans(osb, credits);
1267         if (IS_ERR(handle)) {
1268                 ret = PTR_ERR(handle);
1269                 mlog_errno(ret);
1270                 goto out_data;
1271         }
1272
1273         written = ocfs2_write(file, phys, handle, di_bh, data_ac,
1274                               meta_ac, &wc);
1275         if (written < 0) {
1276                 ret = written;
1277                 mlog_errno(ret);
1278                 goto out_commit;
1279         }
1280
1281         ret = ocfs2_journal_access(handle, inode, di_bh,
1282                                    OCFS2_JOURNAL_ACCESS_WRITE);
1283         if (ret) {
1284                 mlog_errno(ret);
1285                 goto out_commit;
1286         }
1287
1288         pos += written;
1289         if (pos > inode->i_size) {
1290                 i_size_write(inode, pos);
1291                 mark_inode_dirty(inode);
1292         }
1293         inode->i_blocks = ocfs2_align_bytes_to_sectors((u64)(i_size_read(inode)));
1294         di->i_size = cpu_to_le64((u64)i_size_read(inode));
1295         inode->i_mtime = inode->i_ctime = CURRENT_TIME;
1296         di->i_mtime = di->i_ctime = cpu_to_le64(inode->i_mtime.tv_sec);
1297         di->i_mtime_nsec = di->i_ctime_nsec = cpu_to_le32(inode->i_mtime.tv_nsec);
1298
1299         ret = ocfs2_journal_dirty(handle, di_bh);
1300         if (ret)
1301                 mlog_errno(ret);
1302
1303 out_commit:
1304         ocfs2_commit_trans(osb, handle);
1305
1306 out_data:
1307         ocfs2_data_unlock(inode, 1);
1308
1309 out_meta:
1310         up_write(&OCFS2_I(inode)->ip_alloc_sem);
1311         ocfs2_meta_unlock(inode, 1);
1312
1313 out:
1314         brelse(di_bh);
1315         if (data_ac)
1316                 ocfs2_free_alloc_context(data_ac);
1317         if (meta_ac)
1318                 ocfs2_free_alloc_context(meta_ac);
1319
1320         return written ? written : ret;
1321 }
1322
1323 const struct address_space_operations ocfs2_aops = {
1324         .readpage       = ocfs2_readpage,
1325         .writepage      = ocfs2_writepage,
1326         .prepare_write  = ocfs2_prepare_write,
1327         .commit_write   = ocfs2_commit_write,
1328         .bmap           = ocfs2_bmap,
1329         .sync_page      = block_sync_page,
1330         .direct_IO      = ocfs2_direct_IO,
1331         .invalidatepage = ocfs2_invalidatepage,
1332         .releasepage    = ocfs2_releasepage,
1333         .migratepage    = buffer_migrate_page,
1334 };