4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Implementation of cl_io for LOV layer.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
42 #define DEBUG_SUBSYSTEM S_LOV
44 #include "lov_cl_internal.h"
50 static inline void lov_sub_enter(struct lov_io_sub *sub)
54 static inline void lov_sub_exit(struct lov_io_sub *sub)
59 static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
60 struct lov_io_sub *sub)
62 if (sub->sub_io != NULL) {
63 if (sub->sub_io_initialized) {
65 cl_io_fini(sub->sub_env, sub->sub_io);
67 sub->sub_io_initialized = 0;
68 lio->lis_active_subios--;
70 if (sub->sub_stripe == lio->lis_single_subio_index)
71 lio->lis_single_subio_index = -1;
72 else if (!sub->sub_borrowed)
73 OBD_FREE_PTR(sub->sub_io);
76 if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
77 if (!sub->sub_borrowed)
78 cl_env_put(sub->sub_env, &sub->sub_refcheck);
83 static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
84 int stripe, loff_t start, loff_t end)
86 struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
87 struct cl_io *parent = lio->lis_cl.cis_io;
91 io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
92 io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
93 io->u.ci_setattr.sa_capa = parent->u.ci_setattr.sa_capa;
94 if (cl_io_is_trunc(io)) {
95 loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
97 new_size = lov_size_to_stripe(lsm, new_size, stripe);
98 io->u.ci_setattr.sa_attr.lvb_size = new_size;
103 struct cl_object *obj = parent->ci_obj;
104 loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
106 io->u.ci_fault = parent->u.ci_fault;
107 off = lov_size_to_stripe(lsm, off, stripe);
108 io->u.ci_fault.ft_index = cl_index(obj, off);
112 io->u.ci_fsync.fi_start = start;
113 io->u.ci_fsync.fi_end = end;
114 io->u.ci_fsync.fi_capa = parent->u.ci_fsync.fi_capa;
115 io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
116 io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
121 io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
122 if (cl_io_is_append(parent)) {
123 io->u.ci_wr.wr_append = 1;
125 io->u.ci_rw.crw_pos = start;
126 io->u.ci_rw.crw_count = end - start;
135 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
136 struct lov_io_sub *sub)
138 struct lov_object *lov = lio->lis_object;
139 struct lov_device *ld = lu2lov_dev(lov2cl(lov)->co_lu.lo_dev);
140 struct cl_io *sub_io;
141 struct cl_object *sub_obj;
142 struct cl_io *io = lio->lis_cl.cis_io;
144 int stripe = sub->sub_stripe;
147 LASSERT(sub->sub_io == NULL);
148 LASSERT(sub->sub_env == NULL);
149 LASSERT(sub->sub_stripe < lio->lis_stripe_count);
152 sub->sub_io_initialized = 0;
153 sub->sub_borrowed = 0;
155 if (lio->lis_mem_frozen) {
156 LASSERT(mutex_is_locked(&ld->ld_mutex));
157 sub->sub_io = &ld->ld_emrg[stripe]->emrg_subio;
158 sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
159 sub->sub_borrowed = 1;
163 /* obtain new environment */
164 cookie = cl_env_reenter();
165 sub->sub_env = cl_env_get(&sub->sub_refcheck);
166 cl_env_reexit(cookie);
167 if (IS_ERR(sub->sub_env))
168 result = PTR_ERR(sub->sub_env);
172 * First sub-io. Use ->lis_single_subio to
173 * avoid dynamic allocation.
175 if (lio->lis_active_subios == 0) {
176 sub->sub_io = &lio->lis_single_subio;
177 lio->lis_single_subio_index = stripe;
179 OBD_ALLOC_PTR(sub->sub_io);
180 if (sub->sub_io == NULL)
187 sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
188 sub_io = sub->sub_io;
190 sub_io->ci_obj = sub_obj;
191 sub_io->ci_result = 0;
193 sub_io->ci_parent = io;
194 sub_io->ci_lockreq = io->ci_lockreq;
195 sub_io->ci_type = io->ci_type;
196 sub_io->ci_no_srvlock = io->ci_no_srvlock;
199 result = cl_io_sub_init(sub->sub_env, sub_io,
200 io->ci_type, sub_obj);
203 lio->lis_active_subios++;
204 sub->sub_io_initialized = 1;
209 lov_io_sub_fini(env, lio, sub);
213 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
214 struct lov_io *lio, int stripe)
217 struct lov_io_sub *sub = &lio->lis_subs[stripe];
219 LASSERT(stripe < lio->lis_stripe_count);
221 if (!sub->sub_io_initialized) {
222 sub->sub_stripe = stripe;
223 rc = lov_io_sub_init(env, lio, sub);
233 void lov_sub_put(struct lov_io_sub *sub)
238 /*****************************************************************************
244 static int lov_page_stripe(const struct cl_page *page)
246 struct lovsub_object *subobj;
249 lu_object_locate(page->cp_child->cp_obj->co_lu.lo_header,
250 &lovsub_device_type));
251 LASSERT(subobj != NULL);
252 return subobj->lso_index;
255 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
256 const struct cl_page_slice *slice)
258 struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
259 struct cl_page *page = slice->cpl_page;
262 LASSERT(lio->lis_cl.cis_io != NULL);
263 LASSERT(cl2lov(slice->cpl_obj) == lio->lis_object);
264 LASSERT(lsm != NULL);
265 LASSERT(lio->lis_nr_subios > 0);
267 stripe = lov_page_stripe(page);
268 return lov_sub_get(env, lio, stripe);
272 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
275 struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
278 LASSERT(lio->lis_object != NULL);
281 * Need to be optimized, we can't afford to allocate a piece of memory
282 * when writing a page. -jay
284 OBD_ALLOC_LARGE(lio->lis_subs,
285 lsm->lsm_stripe_count * sizeof(lio->lis_subs[0]));
286 if (lio->lis_subs != NULL) {
287 lio->lis_nr_subios = lio->lis_stripe_count;
288 lio->lis_single_subio_index = -1;
289 lio->lis_active_subios = 0;
296 static void lov_io_slice_init(struct lov_io *lio,
297 struct lov_object *obj, struct cl_io *io)
300 lio->lis_object = obj;
302 LASSERT(obj->lo_lsm != NULL);
303 lio->lis_stripe_count = obj->lo_lsm->lsm_stripe_count;
305 switch (io->ci_type) {
308 lio->lis_pos = io->u.ci_rw.crw_pos;
309 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
310 lio->lis_io_endpos = lio->lis_endpos;
311 if (cl_io_is_append(io)) {
312 LASSERT(io->ci_type == CIT_WRITE);
314 lio->lis_endpos = OBD_OBJECT_EOF;
319 if (cl_io_is_trunc(io))
320 lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
323 lio->lis_endpos = OBD_OBJECT_EOF;
327 pgoff_t index = io->u.ci_fault.ft_index;
328 lio->lis_pos = cl_offset(io->ci_obj, index);
329 lio->lis_endpos = cl_offset(io->ci_obj, index + 1);
334 lio->lis_pos = io->u.ci_fsync.fi_start;
335 lio->lis_endpos = io->u.ci_fsync.fi_end;
341 lio->lis_endpos = OBD_OBJECT_EOF;
349 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
351 struct lov_io *lio = cl2lov_io(env, ios);
352 struct lov_object *lov = cl2lov(ios->cis_obj);
355 if (lio->lis_subs != NULL) {
356 for (i = 0; i < lio->lis_nr_subios; i++)
357 lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
358 OBD_FREE_LARGE(lio->lis_subs,
359 lio->lis_nr_subios * sizeof(lio->lis_subs[0]));
360 lio->lis_nr_subios = 0;
363 LASSERT(atomic_read(&lov->lo_active_ios) > 0);
364 if (atomic_dec_and_test(&lov->lo_active_ios))
365 wake_up_all(&lov->lo_waitq);
368 static obd_off lov_offset_mod(obd_off val, int delta)
370 if (val != OBD_OBJECT_EOF)
375 static int lov_io_iter_init(const struct lu_env *env,
376 const struct cl_io_slice *ios)
378 struct lov_io *lio = cl2lov_io(env, ios);
379 struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
380 struct lov_io_sub *sub;
387 endpos = lov_offset_mod(lio->lis_endpos, -1);
388 for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
389 if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
390 endpos, &start, &end))
393 end = lov_offset_mod(end, +1);
394 sub = lov_sub_get(env, lio, stripe);
396 lov_io_sub_inherit(sub->sub_io, lio, stripe,
398 rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
400 CDEBUG(D_VFSTRACE, "shrink: %d ["LPU64", "LPU64")\n",
406 list_add_tail(&sub->sub_linkage, &lio->lis_active);
413 static int lov_io_rw_iter_init(const struct lu_env *env,
414 const struct cl_io_slice *ios)
416 struct lov_io *lio = cl2lov_io(env, ios);
417 struct cl_io *io = ios->cis_io;
418 struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
419 __u64 start = io->u.ci_rw.crw_pos;
421 unsigned long ssize = lsm->lsm_stripe_size;
423 LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
425 /* fast path for common case. */
426 if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
428 lov_do_div64(start, ssize);
429 next = (start + 1) * ssize;
430 if (next <= start * ssize)
433 io->ci_continue = next < lio->lis_io_endpos;
434 io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
435 next) - io->u.ci_rw.crw_pos;
436 lio->lis_pos = io->u.ci_rw.crw_pos;
437 lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
438 CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "
439 LPU64"\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
440 (__u64)lio->lis_io_endpos);
443 * XXX The following call should be optimized: we know, that
444 * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
446 return lov_io_iter_init(env, ios);
449 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
450 int (*iofunc)(const struct lu_env *, struct cl_io *))
452 struct cl_io *parent = lio->lis_cl.cis_io;
453 struct lov_io_sub *sub;
456 list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
458 rc = iofunc(sub->sub_env, sub->sub_io);
463 if (parent->ci_result == 0)
464 parent->ci_result = sub->sub_io->ci_result;
469 static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
471 return lov_io_call(env, cl2lov_io(env, ios), cl_io_lock);
474 static int lov_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
476 return lov_io_call(env, cl2lov_io(env, ios), cl_io_start);
479 static int lov_io_end_wrapper(const struct lu_env *env, struct cl_io *io)
482 * It's possible that lov_io_start() wasn't called against this
483 * sub-io, either because previous sub-io failed, or upper layer
486 if (io->ci_state == CIS_IO_GOING)
489 io->ci_state = CIS_IO_FINISHED;
493 static int lov_io_iter_fini_wrapper(const struct lu_env *env, struct cl_io *io)
495 cl_io_iter_fini(env, io);
499 static int lov_io_unlock_wrapper(const struct lu_env *env, struct cl_io *io)
501 cl_io_unlock(env, io);
505 static void lov_io_end(const struct lu_env *env, const struct cl_io_slice *ios)
509 rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_end_wrapper);
513 static void lov_io_iter_fini(const struct lu_env *env,
514 const struct cl_io_slice *ios)
516 struct lov_io *lio = cl2lov_io(env, ios);
519 rc = lov_io_call(env, lio, lov_io_iter_fini_wrapper);
521 while (!list_empty(&lio->lis_active))
522 list_del_init(lio->lis_active.next);
525 static void lov_io_unlock(const struct lu_env *env,
526 const struct cl_io_slice *ios)
530 rc = lov_io_call(env, cl2lov_io(env, ios), lov_io_unlock_wrapper);
535 static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
536 struct cl_page_list *qin,
539 return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
543 * lov implementation of cl_operations::cio_submit() method. It takes a list
544 * of pages in \a queue, splits it into per-stripe sub-lists, invokes
545 * cl_io_submit() on underlying devices to submit sub-lists, and then splices
548 * Major complication of this function is a need to handle memory cleansing:
549 * cl_io_submit() is called to write out pages as a part of VM memory
550 * reclamation, and hence it may not fail due to memory shortages (system
551 * dead-locks otherwise). To deal with this, some resources (sub-lists,
552 * sub-environment, etc.) are allocated per-device on "startup" (i.e., in a
553 * not-memory cleansing context), and in case of memory shortage, these
554 * pre-allocated resources are used by lov_io_submit() under
555 * lov_device::ld_mutex mutex.
557 static int lov_io_submit(const struct lu_env *env,
558 const struct cl_io_slice *ios,
559 enum cl_req_type crt, struct cl_2queue *queue)
561 struct lov_io *lio = cl2lov_io(env, ios);
562 struct lov_object *obj = lio->lis_object;
563 struct lov_device *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
564 struct cl_page_list *qin = &queue->c2_qin;
565 struct cl_2queue *cl2q = &lov_env_info(env)->lti_cl2q;
566 struct cl_page_list *stripes_qin = NULL;
567 struct cl_page *page;
571 #define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
575 !(current->flags & PF_MEMALLOC);
577 if (lio->lis_active_subios == 1) {
578 int idx = lio->lis_single_subio_index;
579 struct lov_io_sub *sub;
581 LASSERT(idx < lio->lis_nr_subios);
582 sub = lov_sub_get(env, lio, idx);
583 LASSERT(!IS_ERR(sub));
584 LASSERT(sub->sub_io == &lio->lis_single_subio);
585 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
591 LASSERT(lio->lis_subs != NULL);
593 OBD_ALLOC_LARGE(stripes_qin,
594 sizeof(*stripes_qin) * lio->lis_nr_subios);
595 if (stripes_qin == NULL)
598 for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
599 cl_page_list_init(&stripes_qin[stripe]);
602 * If we get here, it means pageout & swap doesn't help.
603 * In order to not make things worse, even don't try to
604 * allocate the memory with __GFP_NOWARN. -jay
606 mutex_lock(&ld->ld_mutex);
607 lio->lis_mem_frozen = 1;
610 cl_2queue_init(cl2q);
611 cl_page_list_for_each_safe(page, tmp, qin) {
612 stripe = lov_page_stripe(page);
613 cl_page_list_move(QIN(stripe), qin, page);
616 for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
617 struct lov_io_sub *sub;
618 struct cl_page_list *sub_qin = QIN(stripe);
620 if (list_empty(&sub_qin->pl_pages))
623 cl_page_list_splice(sub_qin, &cl2q->c2_qin);
624 sub = lov_sub_get(env, lio, stripe);
626 rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
631 cl_page_list_splice(&cl2q->c2_qin, &queue->c2_qin);
632 cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
637 for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
638 struct cl_page_list *sub_qin = QIN(stripe);
640 if (list_empty(&sub_qin->pl_pages))
643 cl_page_list_splice(sub_qin, qin);
647 OBD_FREE_LARGE(stripes_qin,
648 sizeof(*stripes_qin) * lio->lis_nr_subios);
652 for (i = 0; i < lio->lis_nr_subios; i++) {
653 struct cl_io *cio = lio->lis_subs[i].sub_io;
655 if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
656 lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
658 lio->lis_mem_frozen = 0;
659 mutex_unlock(&ld->ld_mutex);
666 static int lov_io_prepare_write(const struct lu_env *env,
667 const struct cl_io_slice *ios,
668 const struct cl_page_slice *slice,
669 unsigned from, unsigned to)
671 struct lov_io *lio = cl2lov_io(env, ios);
672 struct cl_page *sub_page = lov_sub_page(slice);
673 struct lov_io_sub *sub;
676 sub = lov_page_subio(env, lio, slice);
678 result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
682 result = PTR_ERR(sub);
686 static int lov_io_commit_write(const struct lu_env *env,
687 const struct cl_io_slice *ios,
688 const struct cl_page_slice *slice,
689 unsigned from, unsigned to)
691 struct lov_io *lio = cl2lov_io(env, ios);
692 struct cl_page *sub_page = lov_sub_page(slice);
693 struct lov_io_sub *sub;
696 sub = lov_page_subio(env, lio, slice);
698 result = cl_io_commit_write(sub->sub_env, sub->sub_io,
702 result = PTR_ERR(sub);
706 static int lov_io_fault_start(const struct lu_env *env,
707 const struct cl_io_slice *ios)
709 struct cl_fault_io *fio;
711 struct lov_io_sub *sub;
713 fio = &ios->cis_io->u.ci_fault;
714 lio = cl2lov_io(env, ios);
715 sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
716 sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
718 return lov_io_start(env, ios);
721 static void lov_io_fsync_end(const struct lu_env *env,
722 const struct cl_io_slice *ios)
724 struct lov_io *lio = cl2lov_io(env, ios);
725 struct lov_io_sub *sub;
726 unsigned int *written = &ios->cis_io->u.ci_fsync.fi_nr_written;
729 list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
730 struct cl_io *subio = sub->sub_io;
733 lov_io_end_wrapper(sub->sub_env, subio);
736 if (subio->ci_result == 0)
737 *written += subio->u.ci_fsync.fi_nr_written;
741 static const struct cl_io_operations lov_io_ops = {
744 .cio_fini = lov_io_fini,
745 .cio_iter_init = lov_io_rw_iter_init,
746 .cio_iter_fini = lov_io_iter_fini,
747 .cio_lock = lov_io_lock,
748 .cio_unlock = lov_io_unlock,
749 .cio_start = lov_io_start,
750 .cio_end = lov_io_end
753 .cio_fini = lov_io_fini,
754 .cio_iter_init = lov_io_rw_iter_init,
755 .cio_iter_fini = lov_io_iter_fini,
756 .cio_lock = lov_io_lock,
757 .cio_unlock = lov_io_unlock,
758 .cio_start = lov_io_start,
759 .cio_end = lov_io_end
762 .cio_fini = lov_io_fini,
763 .cio_iter_init = lov_io_iter_init,
764 .cio_iter_fini = lov_io_iter_fini,
765 .cio_lock = lov_io_lock,
766 .cio_unlock = lov_io_unlock,
767 .cio_start = lov_io_start,
768 .cio_end = lov_io_end
771 .cio_fini = lov_io_fini,
772 .cio_iter_init = lov_io_iter_init,
773 .cio_iter_fini = lov_io_iter_fini,
774 .cio_lock = lov_io_lock,
775 .cio_unlock = lov_io_unlock,
776 .cio_start = lov_io_fault_start,
777 .cio_end = lov_io_end
780 .cio_fini = lov_io_fini,
781 .cio_iter_init = lov_io_iter_init,
782 .cio_iter_fini = lov_io_iter_fini,
783 .cio_lock = lov_io_lock,
784 .cio_unlock = lov_io_unlock,
785 .cio_start = lov_io_start,
786 .cio_end = lov_io_fsync_end
789 .cio_fini = lov_io_fini
794 .cio_submit = lov_io_submit
797 .cio_submit = lov_io_submit
800 .cio_prepare_write = lov_io_prepare_write,
801 .cio_commit_write = lov_io_commit_write
804 /*****************************************************************************
806 * Empty lov io operations.
810 static void lov_empty_io_fini(const struct lu_env *env,
811 const struct cl_io_slice *ios)
813 struct lov_object *lov = cl2lov(ios->cis_obj);
815 if (atomic_dec_and_test(&lov->lo_active_ios))
816 wake_up_all(&lov->lo_waitq);
819 static void lov_empty_impossible(const struct lu_env *env,
820 struct cl_io_slice *ios)
825 #define LOV_EMPTY_IMPOSSIBLE ((void *)lov_empty_impossible)
828 * An io operation vector for files without stripes.
830 static const struct cl_io_operations lov_empty_io_ops = {
833 .cio_fini = lov_empty_io_fini,
835 .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
836 .cio_lock = LOV_EMPTY_IMPOSSIBLE,
837 .cio_start = LOV_EMPTY_IMPOSSIBLE,
838 .cio_end = LOV_EMPTY_IMPOSSIBLE
842 .cio_fini = lov_empty_io_fini,
843 .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
844 .cio_lock = LOV_EMPTY_IMPOSSIBLE,
845 .cio_start = LOV_EMPTY_IMPOSSIBLE,
846 .cio_end = LOV_EMPTY_IMPOSSIBLE
849 .cio_fini = lov_empty_io_fini,
850 .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
851 .cio_lock = LOV_EMPTY_IMPOSSIBLE,
852 .cio_start = LOV_EMPTY_IMPOSSIBLE,
853 .cio_end = LOV_EMPTY_IMPOSSIBLE
856 .cio_fini = lov_empty_io_fini,
857 .cio_iter_init = LOV_EMPTY_IMPOSSIBLE,
858 .cio_lock = LOV_EMPTY_IMPOSSIBLE,
859 .cio_start = LOV_EMPTY_IMPOSSIBLE,
860 .cio_end = LOV_EMPTY_IMPOSSIBLE
863 .cio_fini = lov_empty_io_fini
866 .cio_fini = lov_empty_io_fini
871 .cio_submit = LOV_EMPTY_IMPOSSIBLE
874 .cio_submit = LOV_EMPTY_IMPOSSIBLE
877 .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
880 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
883 struct lov_io *lio = lov_env_io(env);
884 struct lov_object *lov = cl2lov(obj);
886 INIT_LIST_HEAD(&lio->lis_active);
887 lov_io_slice_init(lio, lov, io);
888 if (io->ci_result == 0) {
889 io->ci_result = lov_io_subio_init(env, lio, io);
890 if (io->ci_result == 0) {
891 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
892 atomic_inc(&lov->lo_active_ios);
895 return io->ci_result;
898 int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
901 struct lov_object *lov = cl2lov(obj);
902 struct lov_io *lio = lov_env_io(env);
905 lio->lis_object = lov;
906 switch (io->ci_type) {
922 CERROR("Page fault on a file without stripes: "DFID"\n",
923 PFID(lu_object_fid(&obj->co_lu)));
927 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
928 atomic_inc(&lov->lo_active_ios);
931 io->ci_result = result < 0 ? result : 0;
935 int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
938 struct lov_object *lov = cl2lov(obj);
939 struct lov_io *lio = lov_env_io(env);
942 LASSERT(lov->lo_lsm != NULL);
943 lio->lis_object = lov;
945 switch (io->ci_type) {
947 LASSERTF(0, "invalid type %d\n", io->ci_type);
956 /* TODO: need to restore the file. */
961 cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
962 atomic_inc(&lov->lo_active_ios);
965 io->ci_result = result < 0 ? result : 0;