4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include "../../include/linux/libcfs/libcfs.h"
41 #include "../include/lustre_dlm.h"
42 #include "../include/lustre_net.h"
43 #include "../include/lustre/lustre_user.h"
44 #include "../include/obd_cksum.h"
46 #include "../include/lustre_ha.h"
47 #include "../include/lprocfs_status.h"
48 #include "../include/lustre_debug.h"
49 #include "../include/lustre_param.h"
50 #include "../include/lustre_fid.h"
51 #include "../include/obd_class.h"
52 #include "../include/obd.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
56 atomic_t osc_pool_req_count;
57 unsigned int osc_reqpool_maxreqcount;
58 struct ptlrpc_request_pool *osc_rq_pool;
60 /* max memory used for request pool, unit is MB */
61 static unsigned int osc_reqpool_mem_max = 5;
62 module_param(osc_reqpool_mem_max, uint, 0444);
64 struct osc_brw_async_args {
70 struct brw_page **aa_ppga;
71 struct client_obd *aa_cli;
72 struct list_head aa_oaps;
73 struct list_head aa_exts;
74 struct cl_req *aa_clerq;
77 struct osc_async_args {
78 struct obd_info *aa_oi;
81 struct osc_setattr_args {
83 obd_enqueue_update_f sa_upcall;
87 struct osc_fsync_args {
88 struct obd_info *fa_oi;
89 obd_enqueue_update_f fa_upcall;
93 struct osc_enqueue_args {
94 struct obd_export *oa_exp;
96 obd_enqueue_update_f oa_upcall;
98 struct ost_lvb *oa_lvb;
99 struct lustre_handle *oa_lockh;
100 struct ldlm_enqueue_info *oa_ei;
101 unsigned int oa_agl:1;
104 static void osc_release_ppga(struct brw_page **ppga, u32 count);
105 static int brw_interpret(const struct lu_env *env,
106 struct ptlrpc_request *req, void *data, int rc);
107 static int osc_cleanup(struct obd_device *obd);
109 /* Pack OSC object metadata for disk storage (LE byte order). */
110 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
111 struct lov_stripe_md *lsm)
115 lmm_size = sizeof(**lmmp);
119 if (*lmmp != NULL && lsm == NULL) {
123 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
128 *lmmp = kzalloc(lmm_size, GFP_NOFS);
134 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
139 /* Unpack OSC object metadata from disk storage (LE byte order). */
140 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
141 struct lov_mds_md *lmm, int lmm_bytes)
144 struct obd_import *imp = class_exp2cliimp(exp);
147 if (lmm_bytes < sizeof(*lmm)) {
148 CERROR("%s: lov_mds_md too small: %d, need %d\n",
149 exp->exp_obd->obd_name, lmm_bytes,
153 /* XXX LOV_MAGIC etc check? */
155 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
156 CERROR("%s: zero lmm_object_id: rc = %d\n",
157 exp->exp_obd->obd_name, -EINVAL);
162 lsm_size = lov_stripe_md_size(1);
166 if (*lsmp != NULL && lmm == NULL) {
167 kfree((*lsmp)->lsm_oinfo[0]);
174 *lsmp = kzalloc(lsm_size, GFP_NOFS);
175 if (unlikely(*lsmp == NULL))
177 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
179 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
183 loi_init((*lsmp)->lsm_oinfo[0]);
184 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
189 /* XXX zero *lsmp? */
190 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
193 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
194 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
196 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
201 static inline void osc_pack_req_body(struct ptlrpc_request *req,
202 struct obd_info *oinfo)
204 struct ost_body *body;
206 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
209 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
213 static int osc_getattr_interpret(const struct lu_env *env,
214 struct ptlrpc_request *req,
215 struct osc_async_args *aa, int rc)
217 struct ost_body *body;
222 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
224 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
225 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
226 aa->aa_oi->oi_oa, &body->oa);
228 /* This should really be sent by the OST */
229 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
230 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
232 CDEBUG(D_INFO, "can't unpack ost_body\n");
234 aa->aa_oi->oi_oa->o_valid = 0;
237 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
241 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
242 struct ptlrpc_request_set *set)
244 struct ptlrpc_request *req;
245 struct osc_async_args *aa;
248 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
252 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
254 ptlrpc_request_free(req);
258 osc_pack_req_body(req, oinfo);
260 ptlrpc_request_set_replen(req);
261 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
263 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264 aa = ptlrpc_req_async_args(req);
267 ptlrpc_set_add_req(set, req);
271 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
272 struct obd_info *oinfo)
274 struct ptlrpc_request *req;
275 struct ost_body *body;
278 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
282 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284 ptlrpc_request_free(req);
288 osc_pack_req_body(req, oinfo);
290 ptlrpc_request_set_replen(req);
292 rc = ptlrpc_queue_wait(req);
296 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
302 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
303 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
306 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
307 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
310 ptlrpc_req_finished(req);
314 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
315 struct obd_info *oinfo, struct obd_trans_info *oti)
317 struct ptlrpc_request *req;
318 struct ost_body *body;
321 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
323 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
327 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
329 ptlrpc_request_free(req);
333 osc_pack_req_body(req, oinfo);
335 ptlrpc_request_set_replen(req);
337 rc = ptlrpc_queue_wait(req);
341 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
347 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
351 ptlrpc_req_finished(req);
355 static int osc_setattr_interpret(const struct lu_env *env,
356 struct ptlrpc_request *req,
357 struct osc_setattr_args *sa, int rc)
359 struct ost_body *body;
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
370 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
373 rc = sa->sa_upcall(sa->sa_cookie, rc);
377 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
378 struct obd_trans_info *oti,
379 obd_enqueue_update_f upcall, void *cookie,
380 struct ptlrpc_request_set *rqset)
382 struct ptlrpc_request *req;
383 struct osc_setattr_args *sa;
386 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
390 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
392 ptlrpc_request_free(req);
396 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
399 osc_pack_req_body(req, oinfo);
401 ptlrpc_request_set_replen(req);
403 /* do mds to ost setattr asynchronously */
405 /* Do not wait for response. */
406 ptlrpcd_add_req(req);
408 req->rq_interpret_reply =
409 (ptlrpc_interpterer_t)osc_setattr_interpret;
411 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
412 sa = ptlrpc_req_async_args(req);
413 sa->sa_oa = oinfo->oi_oa;
414 sa->sa_upcall = upcall;
415 sa->sa_cookie = cookie;
417 if (rqset == PTLRPCD_SET)
418 ptlrpcd_add_req(req);
420 ptlrpc_set_add_req(rqset, req);
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427 struct obd_trans_info *oti,
428 struct ptlrpc_request_set *rqset)
430 return osc_setattr_async_base(exp, oinfo, oti,
431 oinfo->oi_cb_up, oinfo, rqset);
434 static int osc_real_create(struct obd_export *exp, struct obdo *oa,
435 struct lov_stripe_md **ea,
436 struct obd_trans_info *oti)
438 struct ptlrpc_request *req;
439 struct ost_body *body;
440 struct lov_stripe_md *lsm;
448 rc = obd_alloc_memmd(exp, &lsm);
453 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
459 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
461 ptlrpc_request_free(req);
465 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
468 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
470 ptlrpc_request_set_replen(req);
472 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
473 oa->o_flags == OBD_FL_DELORPHAN) {
475 "delorphan from OST integration");
476 /* Don't resend the delorphan req */
477 req->rq_no_resend = req->rq_no_delay = 1;
480 rc = ptlrpc_queue_wait(req);
484 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
490 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
491 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
493 oa->o_blksize = cli_brw_size(exp->exp_obd);
494 oa->o_valid |= OBD_MD_FLBLKSZ;
496 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
497 * have valid lsm_oinfo data structs, so don't go touching that.
498 * This needs to be fixed in a big way.
500 lsm->lsm_oi = oa->o_oi;
504 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
506 if (oa->o_valid & OBD_MD_FLCOOKIE) {
507 if (!oti->oti_logcookies)
508 oti_alloc_cookies(oti, 1);
509 *oti->oti_logcookies = oa->o_lcookie;
513 CDEBUG(D_HA, "transno: %lld\n",
514 lustre_msg_get_transno(req->rq_repmsg));
516 ptlrpc_req_finished(req);
519 obd_free_memmd(exp, &lsm);
523 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
524 obd_enqueue_update_f upcall, void *cookie,
525 struct ptlrpc_request_set *rqset)
527 struct ptlrpc_request *req;
528 struct osc_setattr_args *sa;
529 struct ost_body *body;
532 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
536 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
538 ptlrpc_request_free(req);
541 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
542 ptlrpc_at_set_req_timeout(req);
544 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
549 ptlrpc_request_set_replen(req);
551 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
552 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
553 sa = ptlrpc_req_async_args(req);
554 sa->sa_oa = oinfo->oi_oa;
555 sa->sa_upcall = upcall;
556 sa->sa_cookie = cookie;
557 if (rqset == PTLRPCD_SET)
558 ptlrpcd_add_req(req);
560 ptlrpc_set_add_req(rqset, req);
565 static int osc_sync_interpret(const struct lu_env *env,
566 struct ptlrpc_request *req,
569 struct osc_fsync_args *fa = arg;
570 struct ost_body *body;
575 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
577 CERROR("can't unpack ost_body\n");
582 *fa->fa_oi->oi_oa = body->oa;
584 rc = fa->fa_upcall(fa->fa_cookie, rc);
588 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
589 obd_enqueue_update_f upcall, void *cookie,
590 struct ptlrpc_request_set *rqset)
592 struct ptlrpc_request *req;
593 struct ost_body *body;
594 struct osc_fsync_args *fa;
597 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
603 ptlrpc_request_free(req);
607 /* overload the size and blocks fields in the oa with start/end */
608 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
610 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
613 ptlrpc_request_set_replen(req);
614 req->rq_interpret_reply = osc_sync_interpret;
616 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
617 fa = ptlrpc_req_async_args(req);
619 fa->fa_upcall = upcall;
620 fa->fa_cookie = cookie;
622 if (rqset == PTLRPCD_SET)
623 ptlrpcd_add_req(req);
625 ptlrpc_set_add_req(rqset, req);
630 /* Find and cancel locally locks matched by @mode in the resource found by
631 * @objid. Found locks are added into @cancel list. Returns the amount of
632 * locks added to @cancels list. */
633 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
634 struct list_head *cancels,
635 ldlm_mode_t mode, __u64 lock_flags)
637 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
638 struct ldlm_res_id res_id;
639 struct ldlm_resource *res;
642 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643 * export) but disabled through procfs (flag in NS).
645 * This distinguishes from a case when ELC is not supported originally,
646 * when we still want to cancel locks in advance and just cancel them
647 * locally, without sending any RPC. */
648 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
651 ostid_build_res_name(&oa->o_oi, &res_id);
652 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
656 LDLM_RESOURCE_ADDREF(res);
657 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
658 lock_flags, 0, NULL);
659 LDLM_RESOURCE_DELREF(res);
660 ldlm_resource_putref(res);
664 static int osc_destroy_interpret(const struct lu_env *env,
665 struct ptlrpc_request *req, void *data,
668 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
670 atomic_dec(&cli->cl_destroy_in_flight);
671 wake_up(&cli->cl_destroy_waitq);
675 static int osc_can_send_destroy(struct client_obd *cli)
677 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
678 cli->cl_max_rpcs_in_flight) {
679 /* The destroy request can be sent */
682 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
683 cli->cl_max_rpcs_in_flight) {
685 * The counter has been modified between the two atomic
688 wake_up(&cli->cl_destroy_waitq);
693 static int osc_create(const struct lu_env *env, struct obd_export *exp,
694 struct obdo *oa, struct lov_stripe_md **ea,
695 struct obd_trans_info *oti)
701 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
703 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
704 oa->o_flags == OBD_FL_RECREATE_OBJS) {
705 return osc_real_create(exp, oa, ea, oti);
708 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
709 return osc_real_create(exp, oa, ea, oti);
711 /* we should not get here anymore */
717 /* Destroy requests can be async always on the client, and we don't even really
718 * care about the return code since the client cannot do anything at all about
720 * When the MDS is unlinking a filename, it saves the file objects into a
721 * recovery llog, and these object records are cancelled when the OST reports
722 * they were destroyed and sync'd to disk (i.e. transaction committed).
723 * If the client dies, or the OST is down when the object should be destroyed,
724 * the records are not cancelled, and when the OST reconnects to the MDS next,
725 * it will retrieve the llog unlink logs and then sends the log cancellation
726 * cookies to the MDS after committing destroy transactions. */
727 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
728 struct obdo *oa, struct lov_stripe_md *ea,
729 struct obd_trans_info *oti, struct obd_export *md_export)
731 struct client_obd *cli = &exp->exp_obd->u.cli;
732 struct ptlrpc_request *req;
733 struct ost_body *body;
738 CDEBUG(D_INFO, "oa NULL\n");
742 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
743 LDLM_FL_DISCARD_DATA);
745 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
747 ldlm_lock_list_put(&cancels, l_bl_ast, count);
751 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
754 ptlrpc_request_free(req);
758 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
759 ptlrpc_at_set_req_timeout(req);
761 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
762 oa->o_lcookie = *oti->oti_logcookies;
763 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
765 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
767 ptlrpc_request_set_replen(req);
769 /* If osc_destroy is for destroying the unlink orphan,
770 * sent from MDT to OST, which should not be blocked here,
771 * because the process might be triggered by ptlrpcd, and
772 * it is not good to block ptlrpcd thread (b=16006)*/
773 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
774 req->rq_interpret_reply = osc_destroy_interpret;
775 if (!osc_can_send_destroy(cli)) {
776 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
780 * Wait until the number of on-going destroy RPCs drops
781 * under max_rpc_in_flight
783 l_wait_event_exclusive(cli->cl_destroy_waitq,
784 osc_can_send_destroy(cli), &lwi);
788 /* Do not wait for response */
789 ptlrpcd_add_req(req);
793 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
796 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
798 LASSERT(!(oa->o_valid & bits));
801 client_obd_list_lock(&cli->cl_loi_list_lock);
802 oa->o_dirty = cli->cl_dirty;
803 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
804 cli->cl_dirty_max)) {
805 CERROR("dirty %lu - %lu > dirty_max %lu\n",
806 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
808 } else if (unlikely(atomic_read(&obd_dirty_pages) -
809 atomic_read(&obd_dirty_transit_pages) >
810 (long)(obd_max_dirty_pages + 1))) {
811 /* The atomic_read() allowing the atomic_inc() are
812 * not covered by a lock thus they may safely race and trip
813 * this CERROR() unless we add in a small fudge factor (+1). */
814 CERROR("dirty %d - %d > system dirty_max %d\n",
815 atomic_read(&obd_dirty_pages),
816 atomic_read(&obd_dirty_transit_pages),
817 obd_max_dirty_pages);
819 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
820 CERROR("dirty %lu - dirty_max %lu too big???\n",
821 cli->cl_dirty, cli->cl_dirty_max);
824 long max_in_flight = (cli->cl_max_pages_per_rpc <<
826 (cli->cl_max_rpcs_in_flight + 1);
827 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
829 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
830 oa->o_dropped = cli->cl_lost_grant;
831 cli->cl_lost_grant = 0;
832 client_obd_list_unlock(&cli->cl_loi_list_lock);
833 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
834 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
838 void osc_update_next_shrink(struct client_obd *cli)
840 cli->cl_next_shrink_grant =
841 cfs_time_shift(cli->cl_grant_shrink_interval);
842 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
843 cli->cl_next_shrink_grant);
846 static void __osc_update_grant(struct client_obd *cli, u64 grant)
848 client_obd_list_lock(&cli->cl_loi_list_lock);
849 cli->cl_avail_grant += grant;
850 client_obd_list_unlock(&cli->cl_loi_list_lock);
853 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
855 if (body->oa.o_valid & OBD_MD_FLGRANT) {
856 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
857 __osc_update_grant(cli, body->oa.o_grant);
861 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
862 u32 keylen, void *key, u32 vallen,
863 void *val, struct ptlrpc_request_set *set);
865 static int osc_shrink_grant_interpret(const struct lu_env *env,
866 struct ptlrpc_request *req,
869 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
870 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
871 struct ost_body *body;
874 __osc_update_grant(cli, oa->o_grant);
878 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
880 osc_update_grant(cli, body);
882 kmem_cache_free(obdo_cachep, oa);
886 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
888 client_obd_list_lock(&cli->cl_loi_list_lock);
889 oa->o_grant = cli->cl_avail_grant / 4;
890 cli->cl_avail_grant -= oa->o_grant;
891 client_obd_list_unlock(&cli->cl_loi_list_lock);
892 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
893 oa->o_valid |= OBD_MD_FLFLAGS;
896 oa->o_flags |= OBD_FL_SHRINK_GRANT;
897 osc_update_next_shrink(cli);
900 /* Shrink the current grant, either from some large amount to enough for a
901 * full set of in-flight RPCs, or if we have already shrunk to that limit
902 * then to enough for a single RPC. This avoids keeping more grant than
903 * needed, and avoids shrinking the grant piecemeal. */
904 static int osc_shrink_grant(struct client_obd *cli)
906 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
907 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
909 client_obd_list_lock(&cli->cl_loi_list_lock);
910 if (cli->cl_avail_grant <= target_bytes)
911 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
912 client_obd_list_unlock(&cli->cl_loi_list_lock);
914 return osc_shrink_grant_to_target(cli, target_bytes);
917 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
920 struct ost_body *body;
922 client_obd_list_lock(&cli->cl_loi_list_lock);
923 /* Don't shrink if we are already above or below the desired limit
924 * We don't want to shrink below a single RPC, as that will negatively
925 * impact block allocation and long-term performance. */
926 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
927 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
929 if (target_bytes >= cli->cl_avail_grant) {
930 client_obd_list_unlock(&cli->cl_loi_list_lock);
933 client_obd_list_unlock(&cli->cl_loi_list_lock);
935 body = kzalloc(sizeof(*body), GFP_NOFS);
939 osc_announce_cached(cli, &body->oa, 0);
941 client_obd_list_lock(&cli->cl_loi_list_lock);
942 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
943 cli->cl_avail_grant = target_bytes;
944 client_obd_list_unlock(&cli->cl_loi_list_lock);
945 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
946 body->oa.o_valid |= OBD_MD_FLFLAGS;
947 body->oa.o_flags = 0;
949 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
950 osc_update_next_shrink(cli);
952 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
953 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
954 sizeof(*body), body, NULL);
956 __osc_update_grant(cli, body->oa.o_grant);
961 static int osc_should_shrink_grant(struct client_obd *client)
963 unsigned long time = cfs_time_current();
964 unsigned long next_shrink = client->cl_next_shrink_grant;
966 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
967 OBD_CONNECT_GRANT_SHRINK) == 0)
970 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
971 /* Get the current RPC size directly, instead of going via:
972 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
973 * Keep comment here so that it can be found by searching. */
974 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
976 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
977 client->cl_avail_grant > brw_size)
980 osc_update_next_shrink(client);
985 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
987 struct client_obd *client;
989 list_for_each_entry(client, &item->ti_obd_list,
990 cl_grant_shrink_list) {
991 if (osc_should_shrink_grant(client))
992 osc_shrink_grant(client);
997 static int osc_add_shrink_grant(struct client_obd *client)
1001 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1003 osc_grant_shrink_grant_cb, NULL,
1004 &client->cl_grant_shrink_list);
1006 CERROR("add grant client %s error %d\n",
1007 client->cl_import->imp_obd->obd_name, rc);
1010 CDEBUG(D_CACHE, "add grant client %s \n",
1011 client->cl_import->imp_obd->obd_name);
1012 osc_update_next_shrink(client);
1016 static int osc_del_shrink_grant(struct client_obd *client)
1018 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1022 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1025 * ocd_grant is the total grant amount we're expect to hold: if we've
1026 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1027 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1029 * race is tolerable here: if we're evicted, but imp_state already
1030 * left EVICTED state, then cl_dirty must be 0 already.
1032 client_obd_list_lock(&cli->cl_loi_list_lock);
1033 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1034 cli->cl_avail_grant = ocd->ocd_grant;
1036 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1038 if (cli->cl_avail_grant < 0) {
1039 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1040 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1041 ocd->ocd_grant, cli->cl_dirty);
1042 /* workaround for servers which do not have the patch from
1044 cli->cl_avail_grant = ocd->ocd_grant;
1047 /* determine the appropriate chunk size used by osc_extent. */
1048 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1049 client_obd_list_unlock(&cli->cl_loi_list_lock);
1051 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1052 cli->cl_import->imp_obd->obd_name,
1053 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1055 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1056 list_empty(&cli->cl_grant_shrink_list))
1057 osc_add_shrink_grant(cli);
1060 /* We assume that the reason this OSC got a short read is because it read
1061 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1062 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1063 * this stripe never got written at or beyond this stripe offset yet. */
1064 static void handle_short_read(int nob_read, u32 page_count,
1065 struct brw_page **pga)
1070 /* skip bytes read OK */
1071 while (nob_read > 0) {
1072 LASSERT(page_count > 0);
1074 if (pga[i]->count > nob_read) {
1075 /* EOF inside this page */
1076 ptr = kmap(pga[i]->pg) +
1077 (pga[i]->off & ~CFS_PAGE_MASK);
1078 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1085 nob_read -= pga[i]->count;
1090 /* zero remaining pages */
1091 while (page_count-- > 0) {
1092 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1093 memset(ptr, 0, pga[i]->count);
1099 static int check_write_rcs(struct ptlrpc_request *req,
1100 int requested_nob, int niocount,
1101 u32 page_count, struct brw_page **pga)
1106 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1107 sizeof(*remote_rcs) *
1109 if (remote_rcs == NULL) {
1110 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1114 /* return error if any niobuf was in error */
1115 for (i = 0; i < niocount; i++) {
1116 if ((int)remote_rcs[i] < 0)
1117 return remote_rcs[i];
1119 if (remote_rcs[i] != 0) {
1120 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1121 i, remote_rcs[i], req);
1126 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1127 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1128 req->rq_bulk->bd_nob_transferred, requested_nob);
1135 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1137 if (p1->flag != p2->flag) {
1138 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1139 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1141 /* warn if we try to combine flags that we don't know to be
1142 * safe to combine */
1143 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1144 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1145 p1->flag, p2->flag);
1150 return (p1->off + p1->count == p2->off);
1153 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1154 struct brw_page **pga, int opc,
1155 cksum_type_t cksum_type)
1159 struct cfs_crypto_hash_desc *hdesc;
1160 unsigned int bufsize;
1162 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1164 LASSERT(pg_count > 0);
1166 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1167 if (IS_ERR(hdesc)) {
1168 CERROR("Unable to initialize checksum hash %s\n",
1169 cfs_crypto_hash_name(cfs_alg));
1170 return PTR_ERR(hdesc);
1173 while (nob > 0 && pg_count > 0) {
1174 int count = pga[i]->count > nob ? nob : pga[i]->count;
1176 /* corrupt the data before we compute the checksum, to
1177 * simulate an OST->client data error */
1178 if (i == 0 && opc == OST_READ &&
1179 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1180 unsigned char *ptr = kmap(pga[i]->pg);
1181 int off = pga[i]->off & ~CFS_PAGE_MASK;
1183 memcpy(ptr + off, "bad1", min(4, nob));
1186 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1187 pga[i]->off & ~CFS_PAGE_MASK,
1190 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1191 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1192 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1193 page_private(pga[i]->pg),
1194 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1196 nob -= pga[i]->count;
1202 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1205 cfs_crypto_hash_final(hdesc, NULL, NULL);
1207 /* For sending we only compute the wrong checksum instead
1208 * of corrupting the data so it is still correct on a redo */
1209 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1215 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1217 struct lov_stripe_md *lsm, u32 page_count,
1218 struct brw_page **pga,
1219 struct ptlrpc_request **reqp,
1223 struct ptlrpc_request *req;
1224 struct ptlrpc_bulk_desc *desc;
1225 struct ost_body *body;
1226 struct obd_ioobj *ioobj;
1227 struct niobuf_remote *niobuf;
1228 int niocount, i, requested_nob, opc, rc;
1229 struct osc_brw_async_args *aa;
1230 struct req_capsule *pill;
1231 struct brw_page *pg_prev;
1233 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1234 return -ENOMEM; /* Recoverable */
1235 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1236 return -EINVAL; /* Fatal */
1238 if ((cmd & OBD_BRW_WRITE) != 0) {
1240 req = ptlrpc_request_alloc_pool(cli->cl_import,
1242 &RQF_OST_BRW_WRITE);
1245 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1250 for (niocount = i = 1; i < page_count; i++) {
1251 if (!can_merge_pages(pga[i - 1], pga[i]))
1255 pill = &req->rq_pill;
1256 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1258 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1259 niocount * sizeof(*niobuf));
1261 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1263 ptlrpc_request_free(req);
1266 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1267 ptlrpc_at_set_req_timeout(req);
1268 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1270 req->rq_no_retry_einprogress = 1;
1272 desc = ptlrpc_prep_bulk_imp(req, page_count,
1273 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1274 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1281 /* NB request now owns desc and will free it when it gets freed */
1283 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1284 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1285 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1286 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1288 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1290 obdo_to_ioobj(oa, ioobj);
1291 ioobj->ioo_bufcnt = niocount;
1292 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1293 * that might be send for this request. The actual number is decided
1294 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1295 * "max - 1" for old client compatibility sending "0", and also so the
1296 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1297 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1298 LASSERT(page_count > 0);
1300 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1301 struct brw_page *pg = pga[i];
1302 int poff = pg->off & ~CFS_PAGE_MASK;
1304 LASSERT(pg->count > 0);
1305 /* make sure there is no gap in the middle of page array */
1306 LASSERTF(page_count == 1 ||
1307 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1308 ergo(i > 0 && i < page_count - 1,
1309 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1310 ergo(i == page_count - 1, poff == 0)),
1311 "i: %d/%d pg: %p off: %llu, count: %u\n",
1312 i, page_count, pg, pg->off, pg->count);
1313 LASSERTF(i == 0 || pg->off > pg_prev->off,
1314 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1316 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1317 pg_prev->pg, page_private(pg_prev->pg),
1318 pg_prev->pg->index, pg_prev->off);
1319 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1320 (pg->flag & OBD_BRW_SRVLOCK));
1322 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1323 requested_nob += pg->count;
1325 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1327 niobuf->len += pg->count;
1329 niobuf->offset = pg->off;
1330 niobuf->len = pg->count;
1331 niobuf->flags = pg->flag;
1336 LASSERTF((void *)(niobuf - niocount) ==
1337 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1338 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1339 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1341 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1343 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1344 body->oa.o_valid |= OBD_MD_FLFLAGS;
1345 body->oa.o_flags = 0;
1347 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1350 if (osc_should_shrink_grant(cli))
1351 osc_shrink_grant_local(cli, &body->oa);
1353 /* size[REQ_REC_OFF] still sizeof (*body) */
1354 if (opc == OST_WRITE) {
1355 if (cli->cl_checksum &&
1356 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1357 /* store cl_cksum_type in a local variable since
1358 * it can be changed via lprocfs */
1359 cksum_type_t cksum_type = cli->cl_cksum_type;
1361 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1362 oa->o_flags &= OBD_FL_LOCAL_MASK;
1363 body->oa.o_flags = 0;
1365 body->oa.o_flags |= cksum_type_pack(cksum_type);
1366 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1367 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1371 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1373 /* save this in 'oa', too, for later checking */
1374 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1375 oa->o_flags |= cksum_type_pack(cksum_type);
1377 /* clear out the checksum flag, in case this is a
1378 * resend but cl_checksum is no longer set. b=11238 */
1379 oa->o_valid &= ~OBD_MD_FLCKSUM;
1381 oa->o_cksum = body->oa.o_cksum;
1382 /* 1 RC per niobuf */
1383 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1384 sizeof(__u32) * niocount);
1386 if (cli->cl_checksum &&
1387 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1388 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1389 body->oa.o_flags = 0;
1390 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1391 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1394 ptlrpc_request_set_replen(req);
1396 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1397 aa = ptlrpc_req_async_args(req);
1399 aa->aa_requested_nob = requested_nob;
1400 aa->aa_nio_count = niocount;
1401 aa->aa_page_count = page_count;
1405 INIT_LIST_HEAD(&aa->aa_oaps);
1411 ptlrpc_req_finished(req);
1415 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1416 __u32 client_cksum, __u32 server_cksum, int nob,
1417 u32 page_count, struct brw_page **pga,
1418 cksum_type_t client_cksum_type)
1422 cksum_type_t cksum_type;
1424 if (server_cksum == client_cksum) {
1425 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1429 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1431 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1434 if (cksum_type != client_cksum_type)
1435 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1437 else if (new_cksum == server_cksum)
1438 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1440 else if (new_cksum == client_cksum)
1441 msg = "changed in transit before arrival at OST";
1443 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1446 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1447 " object "DOSTID" extent [%llu-%llu]\n",
1448 msg, libcfs_nid2str(peer->nid),
1449 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1450 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1451 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1452 POSTID(&oa->o_oi), pga[0]->off,
1453 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1454 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1455 client_cksum, client_cksum_type,
1456 server_cksum, cksum_type, new_cksum);
1460 /* Note rc enters this function as number of bytes transferred */
1461 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1463 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1464 const lnet_process_id_t *peer =
1465 &req->rq_import->imp_connection->c_peer;
1466 struct client_obd *cli = aa->aa_cli;
1467 struct ost_body *body;
1468 __u32 client_cksum = 0;
1470 if (rc < 0 && rc != -EDQUOT) {
1471 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1475 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1476 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1478 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1482 /* set/clear over quota flag for a uid/gid */
1483 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1484 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1485 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1487 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1488 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1490 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1493 osc_update_grant(cli, body);
1498 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1499 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1501 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1503 CERROR("Unexpected +ve rc %d\n", rc);
1506 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1508 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1511 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1512 check_write_checksum(&body->oa, peer, client_cksum,
1513 body->oa.o_cksum, aa->aa_requested_nob,
1514 aa->aa_page_count, aa->aa_ppga,
1515 cksum_type_unpack(aa->aa_oa->o_flags)))
1518 rc = check_write_rcs(req, aa->aa_requested_nob,
1520 aa->aa_page_count, aa->aa_ppga);
1524 /* The rest of this function executes only for OST_READs */
1526 /* if unwrap_bulk failed, return -EAGAIN to retry */
1527 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1533 if (rc > aa->aa_requested_nob) {
1534 CERROR("Unexpected rc %d (%d requested)\n", rc,
1535 aa->aa_requested_nob);
1539 if (rc != req->rq_bulk->bd_nob_transferred) {
1540 CERROR("Unexpected rc %d (%d transferred)\n",
1541 rc, req->rq_bulk->bd_nob_transferred);
1545 if (rc < aa->aa_requested_nob)
1546 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1548 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1549 static int cksum_counter;
1550 __u32 server_cksum = body->oa.o_cksum;
1553 cksum_type_t cksum_type;
1555 cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
1556 body->oa.o_flags : 0);
1557 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1558 aa->aa_ppga, OST_READ,
1561 if (peer->nid != req->rq_bulk->bd_sender) {
1563 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1566 if (server_cksum != client_cksum) {
1567 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1568 req->rq_import->imp_obd->obd_name,
1569 libcfs_nid2str(peer->nid),
1571 body->oa.o_valid & OBD_MD_FLFID ?
1572 body->oa.o_parent_seq : (__u64)0,
1573 body->oa.o_valid & OBD_MD_FLFID ?
1574 body->oa.o_parent_oid : 0,
1575 body->oa.o_valid & OBD_MD_FLFID ?
1576 body->oa.o_parent_ver : 0,
1577 POSTID(&body->oa.o_oi),
1578 aa->aa_ppga[0]->off,
1579 aa->aa_ppga[aa->aa_page_count-1]->off +
1580 aa->aa_ppga[aa->aa_page_count-1]->count -
1582 CERROR("client %x, server %x, cksum_type %x\n",
1583 client_cksum, server_cksum, cksum_type);
1585 aa->aa_oa->o_cksum = client_cksum;
1589 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1592 } else if (unlikely(client_cksum)) {
1593 static int cksum_missed;
1596 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1597 CERROR("Checksum %u requested from %s but not sent\n",
1598 cksum_missed, libcfs_nid2str(peer->nid));
1604 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1605 aa->aa_oa, &body->oa);
1610 static int osc_brw_redo_request(struct ptlrpc_request *request,
1611 struct osc_brw_async_args *aa, int rc)
1613 struct ptlrpc_request *new_req;
1614 struct osc_brw_async_args *new_aa;
1615 struct osc_async_page *oap;
1617 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1618 "redo for recoverable error %d", rc);
1620 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1621 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1622 aa->aa_cli, aa->aa_oa,
1623 NULL /* lsm unused by osc currently */,
1624 aa->aa_page_count, aa->aa_ppga,
1629 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1630 if (oap->oap_request != NULL) {
1631 LASSERTF(request == oap->oap_request,
1632 "request %p != oap_request %p\n",
1633 request, oap->oap_request);
1634 if (oap->oap_interrupted) {
1635 ptlrpc_req_finished(new_req);
1640 /* New request takes over pga and oaps from old request.
1641 * Note that copying a list_head doesn't work, need to move it... */
1643 new_req->rq_interpret_reply = request->rq_interpret_reply;
1644 new_req->rq_async_args = request->rq_async_args;
1645 /* cap resend delay to the current request timeout, this is similar to
1646 * what ptlrpc does (see after_reply()) */
1647 if (aa->aa_resends > new_req->rq_timeout)
1648 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1650 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1651 new_req->rq_generation_set = 1;
1652 new_req->rq_import_generation = request->rq_import_generation;
1654 new_aa = ptlrpc_req_async_args(new_req);
1656 INIT_LIST_HEAD(&new_aa->aa_oaps);
1657 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1658 INIT_LIST_HEAD(&new_aa->aa_exts);
1659 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1660 new_aa->aa_resends = aa->aa_resends;
1662 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1663 if (oap->oap_request) {
1664 ptlrpc_req_finished(oap->oap_request);
1665 oap->oap_request = ptlrpc_request_addref(new_req);
1669 /* XXX: This code will run into problem if we're going to support
1670 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1671 * and wait for all of them to be finished. We should inherit request
1672 * set from old request. */
1673 ptlrpcd_add_req(new_req);
1675 DEBUG_REQ(D_INFO, new_req, "new request");
1680 * ugh, we want disk allocation on the target to happen in offset order. we'll
1681 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1682 * fine for our small page arrays and doesn't require allocation. its an
1683 * insertion sort that swaps elements that are strides apart, shrinking the
1684 * stride down until its '1' and the array is sorted.
1686 static void sort_brw_pages(struct brw_page **array, int num)
1689 struct brw_page *tmp;
1693 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1698 for (i = stride ; i < num ; i++) {
1701 while (j >= stride && array[j - stride]->off > tmp->off) {
1702 array[j] = array[j - stride];
1707 } while (stride > 1);
1710 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1712 LASSERT(ppga != NULL);
1716 static int brw_interpret(const struct lu_env *env,
1717 struct ptlrpc_request *req, void *data, int rc)
1719 struct osc_brw_async_args *aa = data;
1720 struct osc_extent *ext;
1721 struct osc_extent *tmp;
1722 struct cl_object *obj = NULL;
1723 struct client_obd *cli = aa->aa_cli;
1725 rc = osc_brw_fini_request(req, rc);
1726 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1727 /* When server return -EINPROGRESS, client should always retry
1728 * regardless of the number of times the bulk was resent already. */
1729 if (osc_recoverable_error(rc)) {
1730 if (req->rq_import_generation !=
1731 req->rq_import->imp_generation) {
1732 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1733 req->rq_import->imp_obd->obd_name,
1734 POSTID(&aa->aa_oa->o_oi), rc);
1735 } else if (rc == -EINPROGRESS ||
1736 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1737 rc = osc_brw_redo_request(req, aa, rc);
1739 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1740 req->rq_import->imp_obd->obd_name,
1741 POSTID(&aa->aa_oa->o_oi), rc);
1746 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1750 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1751 if (obj == NULL && rc == 0) {
1752 obj = osc2cl(ext->oe_obj);
1756 list_del_init(&ext->oe_link);
1757 osc_extent_finish(env, ext, 1, rc);
1759 LASSERT(list_empty(&aa->aa_exts));
1760 LASSERT(list_empty(&aa->aa_oaps));
1763 struct obdo *oa = aa->aa_oa;
1764 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1765 unsigned long valid = 0;
1768 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1769 attr->cat_blocks = oa->o_blocks;
1770 valid |= CAT_BLOCKS;
1772 if (oa->o_valid & OBD_MD_FLMTIME) {
1773 attr->cat_mtime = oa->o_mtime;
1776 if (oa->o_valid & OBD_MD_FLATIME) {
1777 attr->cat_atime = oa->o_atime;
1780 if (oa->o_valid & OBD_MD_FLCTIME) {
1781 attr->cat_ctime = oa->o_ctime;
1785 cl_object_attr_lock(obj);
1786 cl_object_attr_set(env, obj, attr, valid);
1787 cl_object_attr_unlock(obj);
1789 cl_object_put(env, obj);
1791 kmem_cache_free(obdo_cachep, aa->aa_oa);
1793 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1794 req->rq_bulk->bd_nob_transferred);
1795 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1796 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1798 client_obd_list_lock(&cli->cl_loi_list_lock);
1799 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1800 * is called so we know whether to go to sync BRWs or wait for more
1801 * RPCs to complete */
1802 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1803 cli->cl_w_in_flight--;
1805 cli->cl_r_in_flight--;
1806 osc_wake_cache_waiters(cli);
1807 client_obd_list_unlock(&cli->cl_loi_list_lock);
1809 osc_io_unplug(env, cli, NULL);
1814 * Build an RPC by the list of extent @ext_list. The caller must ensure
1815 * that the total pages in this list are NOT over max pages per RPC.
1816 * Extents in the list must be in OES_RPC state.
1818 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1819 struct list_head *ext_list, int cmd)
1821 struct ptlrpc_request *req = NULL;
1822 struct osc_extent *ext;
1823 struct brw_page **pga = NULL;
1824 struct osc_brw_async_args *aa = NULL;
1825 struct obdo *oa = NULL;
1826 struct osc_async_page *oap;
1827 struct osc_async_page *tmp;
1828 struct cl_req *clerq = NULL;
1829 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1830 struct ldlm_lock *lock = NULL;
1831 struct cl_req_attr *crattr = NULL;
1832 u64 starting_offset = OBD_OBJECT_EOF;
1833 u64 ending_offset = 0;
1839 struct ost_body *body;
1840 LIST_HEAD(rpc_list);
1842 LASSERT(!list_empty(ext_list));
1844 /* add pages into rpc_list to build BRW rpc */
1845 list_for_each_entry(ext, ext_list, oe_link) {
1846 LASSERT(ext->oe_state == OES_RPC);
1847 mem_tight |= ext->oe_memalloc;
1848 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1850 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1851 if (starting_offset > oap->oap_obj_off)
1852 starting_offset = oap->oap_obj_off;
1854 LASSERT(oap->oap_page_off == 0);
1855 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1856 ending_offset = oap->oap_obj_off +
1859 LASSERT(oap->oap_page_off + oap->oap_count ==
1865 mpflag = cfs_memory_pressure_get_and_set();
1867 crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
1873 pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1879 oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
1886 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1887 struct cl_page *page = oap2cl_page(oap);
1889 if (clerq == NULL) {
1890 clerq = cl_req_alloc(env, page, crt,
1891 1 /* only 1-object rpcs for now */);
1892 if (IS_ERR(clerq)) {
1893 rc = PTR_ERR(clerq);
1896 lock = oap->oap_ldlm_lock;
1899 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1900 pga[i] = &oap->oap_brw_page;
1901 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1902 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1903 pga[i]->pg, page_index(oap->oap_page), oap,
1906 cl_req_page_add(env, clerq, page);
1909 /* always get the data for the obdo for the rpc */
1910 LASSERT(clerq != NULL);
1911 crattr->cra_oa = oa;
1912 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1914 oa->o_handle = lock->l_remote_handle;
1915 oa->o_valid |= OBD_MD_FLHANDLE;
1918 rc = cl_req_prep(env, clerq);
1920 CERROR("cl_req_prep failed: %d\n", rc);
1924 sort_brw_pages(pga, page_count);
1925 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1928 CERROR("prep_req failed: %d\n", rc);
1932 req->rq_interpret_reply = brw_interpret;
1935 req->rq_memalloc = 1;
1937 /* Need to update the timestamps after the request is built in case
1938 * we race with setattr (locally or in queue at OST). If OST gets
1939 * later setattr before earlier BRW (as determined by the request xid),
1940 * the OST will not use BRW timestamps. Sadly, there is no obvious
1941 * way to do this in a single call. bug 10150 */
1942 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1943 crattr->cra_oa = &body->oa;
1944 cl_req_attr_set(env, clerq, crattr,
1945 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1947 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1949 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1950 aa = ptlrpc_req_async_args(req);
1951 INIT_LIST_HEAD(&aa->aa_oaps);
1952 list_splice_init(&rpc_list, &aa->aa_oaps);
1953 INIT_LIST_HEAD(&aa->aa_exts);
1954 list_splice_init(ext_list, &aa->aa_exts);
1955 aa->aa_clerq = clerq;
1957 /* queued sync pages can be torn down while the pages
1958 * were between the pending list and the rpc */
1960 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1961 /* only one oap gets a request reference */
1964 if (oap->oap_interrupted && !req->rq_intr) {
1965 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1967 ptlrpc_mark_interrupted(req);
1971 tmp->oap_request = ptlrpc_request_addref(req);
1973 client_obd_list_lock(&cli->cl_loi_list_lock);
1974 starting_offset >>= PAGE_CACHE_SHIFT;
1975 if (cmd == OBD_BRW_READ) {
1976 cli->cl_r_in_flight++;
1977 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1978 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1979 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1980 starting_offset + 1);
1982 cli->cl_w_in_flight++;
1983 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1984 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1985 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1986 starting_offset + 1);
1988 client_obd_list_unlock(&cli->cl_loi_list_lock);
1990 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
1991 page_count, aa, cli->cl_r_in_flight,
1992 cli->cl_w_in_flight);
1994 ptlrpcd_add_req(req);
1999 cfs_memory_pressure_restore(mpflag);
2004 LASSERT(req == NULL);
2007 kmem_cache_free(obdo_cachep, oa);
2009 /* this should happen rarely and is pretty bad, it makes the
2010 * pending list not follow the dirty order */
2011 while (!list_empty(ext_list)) {
2012 ext = list_entry(ext_list->next, struct osc_extent,
2014 list_del_init(&ext->oe_link);
2015 osc_extent_finish(env, ext, 0, rc);
2017 if (clerq && !IS_ERR(clerq))
2018 cl_req_completion(env, clerq, rc);
2023 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2024 struct ldlm_enqueue_info *einfo)
2026 void *data = einfo->ei_cbdata;
2029 LASSERT(lock != NULL);
2030 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2031 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2032 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2033 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2035 lock_res_and_lock(lock);
2036 spin_lock(&osc_ast_guard);
2038 if (lock->l_ast_data == NULL)
2039 lock->l_ast_data = data;
2040 if (lock->l_ast_data == data)
2043 spin_unlock(&osc_ast_guard);
2044 unlock_res_and_lock(lock);
2049 static int osc_set_data_with_check(struct lustre_handle *lockh,
2050 struct ldlm_enqueue_info *einfo)
2052 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2056 set = osc_set_lock_data_with_check(lock, einfo);
2057 LDLM_LOCK_PUT(lock);
2059 CERROR("lockh %p, data %p - client evicted?\n",
2060 lockh, einfo->ei_cbdata);
2064 /* find any ldlm lock of the inode in osc
2068 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2069 ldlm_iterator_t replace, void *data)
2071 struct ldlm_res_id res_id;
2072 struct obd_device *obd = class_exp2obd(exp);
2075 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2076 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2077 if (rc == LDLM_ITER_STOP)
2079 if (rc == LDLM_ITER_CONTINUE)
2084 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2085 obd_enqueue_update_f upcall, void *cookie,
2086 __u64 *flags, int agl, int rc)
2088 int intent = *flags & LDLM_FL_HAS_INTENT;
2091 /* The request was created before ldlm_cli_enqueue call. */
2092 if (rc == ELDLM_LOCK_ABORTED) {
2093 struct ldlm_reply *rep;
2095 rep = req_capsule_server_get(&req->rq_pill,
2098 LASSERT(rep != NULL);
2099 rep->lock_policy_res1 =
2100 ptlrpc_status_ntoh(rep->lock_policy_res1);
2101 if (rep->lock_policy_res1)
2102 rc = rep->lock_policy_res1;
2106 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2108 *flags |= LDLM_FL_LVB_READY;
2109 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2110 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2113 /* Call the update callback. */
2114 rc = (*upcall)(cookie, rc);
2118 static int osc_enqueue_interpret(const struct lu_env *env,
2119 struct ptlrpc_request *req,
2120 struct osc_enqueue_args *aa, int rc)
2122 struct ldlm_lock *lock;
2123 struct lustre_handle handle;
2125 struct ost_lvb *lvb;
2127 __u64 *flags = aa->oa_flags;
2129 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2130 * might be freed anytime after lock upcall has been called. */
2131 lustre_handle_copy(&handle, aa->oa_lockh);
2132 mode = aa->oa_ei->ei_mode;
2134 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2136 lock = ldlm_handle2lock(&handle);
2138 /* Take an additional reference so that a blocking AST that
2139 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2140 * to arrive after an upcall has been executed by
2141 * osc_enqueue_fini(). */
2142 ldlm_lock_addref(&handle, mode);
2144 /* Let CP AST to grant the lock first. */
2145 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2147 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2152 lvb_len = sizeof(*aa->oa_lvb);
2155 /* Complete obtaining the lock procedure. */
2156 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2157 mode, flags, lvb, lvb_len, &handle, rc);
2158 /* Complete osc stuff. */
2159 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2160 flags, aa->oa_agl, rc);
2162 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2164 /* Release the lock for async request. */
2165 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2167 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2168 * not already released by
2169 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2171 ldlm_lock_decref(&handle, mode);
2173 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2174 aa->oa_lockh, req, aa);
2175 ldlm_lock_decref(&handle, mode);
2176 LDLM_LOCK_PUT(lock);
2180 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2182 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2183 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2184 * other synchronous requests, however keeping some locks and trying to obtain
2185 * others may take a considerable amount of time in a case of ost failure; and
2186 * when other sync requests do not get released lock from a client, the client
2187 * is excluded from the cluster -- such scenarious make the life difficult, so
2188 * release locks just after they are obtained. */
2189 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2190 __u64 *flags, ldlm_policy_data_t *policy,
2191 struct ost_lvb *lvb, int kms_valid,
2192 obd_enqueue_update_f upcall, void *cookie,
2193 struct ldlm_enqueue_info *einfo,
2194 struct lustre_handle *lockh,
2195 struct ptlrpc_request_set *rqset, int async, int agl)
2197 struct obd_device *obd = exp->exp_obd;
2198 struct ptlrpc_request *req = NULL;
2199 int intent = *flags & LDLM_FL_HAS_INTENT;
2200 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2204 /* Filesystem lock extents are extended to page boundaries so that
2205 * dealing with the page cache is a little smoother. */
2206 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2207 policy->l_extent.end |= ~CFS_PAGE_MASK;
2210 * kms is not valid when either object is completely fresh (so that no
2211 * locks are cached), or object was evicted. In the latter case cached
2212 * lock cannot be used, because it would prime inode state with
2213 * potentially stale LVB.
2218 /* Next, search for already existing extent locks that will cover us */
2219 /* If we're trying to read, we also search for an existing PW lock. The
2220 * VFS and page cache already protect us locally, so lots of readers/
2221 * writers can share a single PW lock.
2223 * There are problems with conversion deadlocks, so instead of
2224 * converting a read lock to a write lock, we'll just enqueue a new
2227 * At some point we should cancel the read lock instead of making them
2228 * send us a blocking callback, but there are problems with canceling
2229 * locks out from other users right now, too. */
2230 mode = einfo->ei_mode;
2231 if (einfo->ei_mode == LCK_PR)
2233 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2234 einfo->ei_type, policy, mode, lockh, 0);
2236 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2238 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2239 /* For AGL, if enqueue RPC is sent but the lock is not
2240 * granted, then skip to process this strpe.
2241 * Return -ECANCELED to tell the caller. */
2242 ldlm_lock_decref(lockh, mode);
2243 LDLM_LOCK_PUT(matched);
2247 if (osc_set_lock_data_with_check(matched, einfo)) {
2248 *flags |= LDLM_FL_LVB_READY;
2249 /* addref the lock only if not async requests and PW
2250 * lock is matched whereas we asked for PR. */
2251 if (!rqset && einfo->ei_mode != mode)
2252 ldlm_lock_addref(lockh, LCK_PR);
2254 /* I would like to be able to ASSERT here that
2255 * rss <= kms, but I can't, for reasons which
2256 * are explained in lov_enqueue() */
2259 /* We already have a lock, and it's referenced.
2261 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2262 * AGL upcall may change it to CLS_HELD directly. */
2263 (*upcall)(cookie, ELDLM_OK);
2265 if (einfo->ei_mode != mode)
2266 ldlm_lock_decref(lockh, LCK_PW);
2268 /* For async requests, decref the lock. */
2269 ldlm_lock_decref(lockh, einfo->ei_mode);
2270 LDLM_LOCK_PUT(matched);
2274 ldlm_lock_decref(lockh, mode);
2275 LDLM_LOCK_PUT(matched);
2282 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2283 &RQF_LDLM_ENQUEUE_LVB);
2287 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2289 ptlrpc_request_free(req);
2293 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2295 ptlrpc_request_set_replen(req);
2298 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2299 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2301 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2302 sizeof(*lvb), LVB_T_OST, lockh, async);
2305 struct osc_enqueue_args *aa;
2307 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2308 aa = ptlrpc_req_async_args(req);
2311 aa->oa_flags = flags;
2312 aa->oa_upcall = upcall;
2313 aa->oa_cookie = cookie;
2315 aa->oa_lockh = lockh;
2318 req->rq_interpret_reply =
2319 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2320 if (rqset == PTLRPCD_SET)
2321 ptlrpcd_add_req(req);
2323 ptlrpc_set_add_req(rqset, req);
2324 } else if (intent) {
2325 ptlrpc_req_finished(req);
2330 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2332 ptlrpc_req_finished(req);
2337 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2338 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2339 __u64 *flags, void *data, struct lustre_handle *lockh,
2342 struct obd_device *obd = exp->exp_obd;
2343 __u64 lflags = *flags;
2346 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2349 /* Filesystem lock extents are extended to page boundaries so that
2350 * dealing with the page cache is a little smoother */
2351 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2352 policy->l_extent.end |= ~CFS_PAGE_MASK;
2354 /* Next, search for already existing extent locks that will cover us */
2355 /* If we're trying to read, we also search for an existing PW lock. The
2356 * VFS and page cache already protect us locally, so lots of readers/
2357 * writers can share a single PW lock. */
2361 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2362 res_id, type, policy, rc, lockh, unref);
2365 if (!osc_set_data_with_check(lockh, data)) {
2366 if (!(lflags & LDLM_FL_TEST_LOCK))
2367 ldlm_lock_decref(lockh, rc);
2371 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2372 ldlm_lock_addref(lockh, LCK_PR);
2373 ldlm_lock_decref(lockh, LCK_PW);
2380 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2382 if (unlikely(mode == LCK_GROUP))
2383 ldlm_lock_decref_and_cancel(lockh, mode);
2385 ldlm_lock_decref(lockh, mode);
2390 static int osc_statfs_interpret(const struct lu_env *env,
2391 struct ptlrpc_request *req,
2392 struct osc_async_args *aa, int rc)
2394 struct obd_statfs *msfs;
2397 /* The request has in fact never been sent
2398 * due to issues at a higher level (LOV).
2399 * Exit immediately since the caller is
2400 * aware of the problem and takes care
2401 * of the clean up */
2404 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2405 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2413 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2419 *aa->aa_oi->oi_osfs = *msfs;
2421 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2425 static int osc_statfs_async(struct obd_export *exp,
2426 struct obd_info *oinfo, __u64 max_age,
2427 struct ptlrpc_request_set *rqset)
2429 struct obd_device *obd = class_exp2obd(exp);
2430 struct ptlrpc_request *req;
2431 struct osc_async_args *aa;
2434 /* We could possibly pass max_age in the request (as an absolute
2435 * timestamp or a "seconds.usec ago") so the target can avoid doing
2436 * extra calls into the filesystem if that isn't necessary (e.g.
2437 * during mount that would help a bit). Having relative timestamps
2438 * is not so great if request processing is slow, while absolute
2439 * timestamps are not ideal because they need time synchronization. */
2440 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2444 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2446 ptlrpc_request_free(req);
2449 ptlrpc_request_set_replen(req);
2450 req->rq_request_portal = OST_CREATE_PORTAL;
2451 ptlrpc_at_set_req_timeout(req);
2453 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2454 /* procfs requests not want stat in wait for avoid deadlock */
2455 req->rq_no_resend = 1;
2456 req->rq_no_delay = 1;
2459 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2460 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2461 aa = ptlrpc_req_async_args(req);
2464 ptlrpc_set_add_req(rqset, req);
2468 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2469 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2471 struct obd_device *obd = class_exp2obd(exp);
2472 struct obd_statfs *msfs;
2473 struct ptlrpc_request *req;
2474 struct obd_import *imp = NULL;
2477 /*Since the request might also come from lprocfs, so we need
2478 *sync this with client_disconnect_export Bug15684*/
2479 down_read(&obd->u.cli.cl_sem);
2480 if (obd->u.cli.cl_import)
2481 imp = class_import_get(obd->u.cli.cl_import);
2482 up_read(&obd->u.cli.cl_sem);
2486 /* We could possibly pass max_age in the request (as an absolute
2487 * timestamp or a "seconds.usec ago") so the target can avoid doing
2488 * extra calls into the filesystem if that isn't necessary (e.g.
2489 * during mount that would help a bit). Having relative timestamps
2490 * is not so great if request processing is slow, while absolute
2491 * timestamps are not ideal because they need time synchronization. */
2492 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2494 class_import_put(imp);
2499 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2501 ptlrpc_request_free(req);
2504 ptlrpc_request_set_replen(req);
2505 req->rq_request_portal = OST_CREATE_PORTAL;
2506 ptlrpc_at_set_req_timeout(req);
2508 if (flags & OBD_STATFS_NODELAY) {
2509 /* procfs requests not want stat in wait for avoid deadlock */
2510 req->rq_no_resend = 1;
2511 req->rq_no_delay = 1;
2514 rc = ptlrpc_queue_wait(req);
2518 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2527 ptlrpc_req_finished(req);
2531 /* Retrieve object striping information.
2533 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2534 * the maximum number of OST indices which will fit in the user buffer.
2535 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2537 static int osc_getstripe(struct lov_stripe_md *lsm,
2538 struct lov_user_md __user *lump)
2540 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2541 struct lov_user_md_v3 lum, *lumk;
2542 struct lov_user_ost_data_v1 *lmm_objects;
2543 int rc = 0, lum_size;
2548 /* we only need the header part from user space to get lmm_magic and
2549 * lmm_stripe_count, (the header part is common to v1 and v3) */
2550 lum_size = sizeof(struct lov_user_md_v1);
2551 if (copy_from_user(&lum, lump, lum_size))
2554 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2555 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2558 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2559 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2560 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2561 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2563 /* we can use lov_mds_md_size() to compute lum_size
2564 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2565 if (lum.lmm_stripe_count > 0) {
2566 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2567 lumk = kzalloc(lum_size, GFP_NOFS);
2571 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2573 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2575 lmm_objects = &(lumk->lmm_objects[0]);
2576 lmm_objects->l_ost_oi = lsm->lsm_oi;
2578 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2582 lumk->lmm_oi = lsm->lsm_oi;
2583 lumk->lmm_stripe_count = 1;
2585 if (copy_to_user(lump, lumk, lum_size))
2594 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2595 void *karg, void __user *uarg)
2597 struct obd_device *obd = exp->exp_obd;
2598 struct obd_ioctl_data *data = karg;
2601 if (!try_module_get(THIS_MODULE)) {
2602 CERROR("Can't get module. Is it alive?");
2606 case OBD_IOC_LOV_GET_CONFIG: {
2608 struct lov_desc *desc;
2609 struct obd_uuid uuid;
2613 if (obd_ioctl_getdata(&buf, &len, uarg)) {
2618 data = (struct obd_ioctl_data *)buf;
2620 if (sizeof(*desc) > data->ioc_inllen1) {
2621 obd_ioctl_freedata(buf, len);
2626 if (data->ioc_inllen2 < sizeof(uuid)) {
2627 obd_ioctl_freedata(buf, len);
2632 desc = (struct lov_desc *)data->ioc_inlbuf1;
2633 desc->ld_tgt_count = 1;
2634 desc->ld_active_tgt_count = 1;
2635 desc->ld_default_stripe_count = 1;
2636 desc->ld_default_stripe_size = 0;
2637 desc->ld_default_stripe_offset = 0;
2638 desc->ld_pattern = 0;
2639 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2641 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2643 err = copy_to_user(uarg, buf, len);
2646 obd_ioctl_freedata(buf, len);
2649 case LL_IOC_LOV_SETSTRIPE:
2650 err = obd_alloc_memmd(exp, karg);
2654 case LL_IOC_LOV_GETSTRIPE:
2655 err = osc_getstripe(karg, uarg);
2657 case OBD_IOC_CLIENT_RECOVER:
2658 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2659 data->ioc_inlbuf1, 0);
2663 case IOC_OSC_SET_ACTIVE:
2664 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2667 case OBD_IOC_POLL_QUOTACHECK:
2668 err = osc_quota_poll_check(exp, karg);
2670 case OBD_IOC_PING_TARGET:
2671 err = ptlrpc_obd_ping(obd);
2674 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2675 cmd, current_comm());
2680 module_put(THIS_MODULE);
2684 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2685 u32 keylen, void *key, __u32 *vallen, void *val,
2686 struct lov_stripe_md *lsm)
2688 if (!vallen || !val)
2691 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2692 __u32 *stripe = val;
2693 *vallen = sizeof(*stripe);
2696 } else if (KEY_IS(KEY_LAST_ID)) {
2697 struct ptlrpc_request *req;
2702 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2703 &RQF_OST_GET_INFO_LAST_ID);
2707 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2708 RCL_CLIENT, keylen);
2709 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2711 ptlrpc_request_free(req);
2715 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2716 memcpy(tmp, key, keylen);
2718 req->rq_no_delay = req->rq_no_resend = 1;
2719 ptlrpc_request_set_replen(req);
2720 rc = ptlrpc_queue_wait(req);
2724 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2725 if (reply == NULL) {
2730 *((u64 *)val) = *reply;
2732 ptlrpc_req_finished(req);
2734 } else if (KEY_IS(KEY_FIEMAP)) {
2735 struct ll_fiemap_info_key *fm_key = key;
2736 struct ldlm_res_id res_id;
2737 ldlm_policy_data_t policy;
2738 struct lustre_handle lockh;
2739 ldlm_mode_t mode = 0;
2740 struct ptlrpc_request *req;
2741 struct ll_user_fiemap *reply;
2745 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2748 policy.l_extent.start = fm_key->fiemap.fm_start &
2751 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2752 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2753 policy.l_extent.end = OBD_OBJECT_EOF;
2755 policy.l_extent.end = (fm_key->fiemap.fm_start +
2756 fm_key->fiemap.fm_length +
2757 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2759 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2760 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2761 LDLM_FL_BLOCK_GRANTED |
2763 &res_id, LDLM_EXTENT, &policy,
2764 LCK_PR | LCK_PW, &lockh, 0);
2765 if (mode) { /* lock is cached on client */
2766 if (mode != LCK_PR) {
2767 ldlm_lock_addref(&lockh, LCK_PR);
2768 ldlm_lock_decref(&lockh, LCK_PW);
2770 } else { /* no cached lock, needs acquire lock on server side */
2771 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2772 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2776 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2777 &RQF_OST_GET_INFO_FIEMAP);
2783 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2784 RCL_CLIENT, keylen);
2785 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2786 RCL_CLIENT, *vallen);
2787 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2788 RCL_SERVER, *vallen);
2790 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2792 ptlrpc_request_free(req);
2796 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2797 memcpy(tmp, key, keylen);
2798 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2799 memcpy(tmp, val, *vallen);
2801 ptlrpc_request_set_replen(req);
2802 rc = ptlrpc_queue_wait(req);
2806 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2807 if (reply == NULL) {
2812 memcpy(val, reply, *vallen);
2814 ptlrpc_req_finished(req);
2817 ldlm_lock_decref(&lockh, LCK_PR);
2824 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2825 u32 keylen, void *key, u32 vallen,
2826 void *val, struct ptlrpc_request_set *set)
2828 struct ptlrpc_request *req;
2829 struct obd_device *obd = exp->exp_obd;
2830 struct obd_import *imp = class_exp2cliimp(exp);
2834 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2836 if (KEY_IS(KEY_CHECKSUM)) {
2837 if (vallen != sizeof(int))
2839 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2843 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2844 sptlrpc_conf_client_adapt(obd);
2848 if (KEY_IS(KEY_FLUSH_CTX)) {
2849 sptlrpc_import_flush_my_ctx(imp);
2853 if (KEY_IS(KEY_CACHE_SET)) {
2854 struct client_obd *cli = &obd->u.cli;
2856 LASSERT(cli->cl_cache == NULL); /* only once */
2857 cli->cl_cache = val;
2858 atomic_inc(&cli->cl_cache->ccc_users);
2859 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2861 /* add this osc into entity list */
2862 LASSERT(list_empty(&cli->cl_lru_osc));
2863 spin_lock(&cli->cl_cache->ccc_lru_lock);
2864 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2865 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2870 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2871 struct client_obd *cli = &obd->u.cli;
2872 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2873 int target = *(int *)val;
2875 nr = osc_lru_shrink(cli, min(nr, target));
2880 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2883 /* We pass all other commands directly to OST. Since nobody calls osc
2884 methods directly and everybody is supposed to go through LOV, we
2885 assume lov checked invalid values for us.
2886 The only recognised values so far are evict_by_nid and mds_conn.
2887 Even if something bad goes through, we'd get a -EINVAL from OST
2890 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2891 &RQF_OST_SET_GRANT_INFO :
2896 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2897 RCL_CLIENT, keylen);
2898 if (!KEY_IS(KEY_GRANT_SHRINK))
2899 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2900 RCL_CLIENT, vallen);
2901 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2903 ptlrpc_request_free(req);
2907 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2908 memcpy(tmp, key, keylen);
2909 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2912 memcpy(tmp, val, vallen);
2914 if (KEY_IS(KEY_GRANT_SHRINK)) {
2915 struct osc_brw_async_args *aa;
2918 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2919 aa = ptlrpc_req_async_args(req);
2920 oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
2922 ptlrpc_req_finished(req);
2925 *oa = ((struct ost_body *)val)->oa;
2927 req->rq_interpret_reply = osc_shrink_grant_interpret;
2930 ptlrpc_request_set_replen(req);
2931 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2932 LASSERT(set != NULL);
2933 ptlrpc_set_add_req(set, req);
2934 ptlrpc_check_set(NULL, set);
2936 ptlrpcd_add_req(req);
2942 static int osc_reconnect(const struct lu_env *env,
2943 struct obd_export *exp, struct obd_device *obd,
2944 struct obd_uuid *cluuid,
2945 struct obd_connect_data *data,
2948 struct client_obd *cli = &obd->u.cli;
2950 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2953 client_obd_list_lock(&cli->cl_loi_list_lock);
2954 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2955 2 * cli_brw_size(obd);
2956 lost_grant = cli->cl_lost_grant;
2957 cli->cl_lost_grant = 0;
2958 client_obd_list_unlock(&cli->cl_loi_list_lock);
2960 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
2961 data->ocd_connect_flags,
2962 data->ocd_version, data->ocd_grant, lost_grant);
2968 static int osc_disconnect(struct obd_export *exp)
2970 struct obd_device *obd = class_exp2obd(exp);
2973 rc = client_disconnect_export(exp);
2975 * Initially we put del_shrink_grant before disconnect_export, but it
2976 * causes the following problem if setup (connect) and cleanup
2977 * (disconnect) are tangled together.
2978 * connect p1 disconnect p2
2979 * ptlrpc_connect_import
2980 * ............... class_manual_cleanup
2983 * ptlrpc_connect_interrupt
2985 * add this client to shrink list
2987 * Bang! pinger trigger the shrink.
2988 * So the osc should be disconnected from the shrink list, after we
2989 * are sure the import has been destroyed. BUG18662
2991 if (obd->u.cli.cl_import == NULL)
2992 osc_del_shrink_grant(&obd->u.cli);
2996 static int osc_import_event(struct obd_device *obd,
2997 struct obd_import *imp,
2998 enum obd_import_event event)
3000 struct client_obd *cli;
3003 LASSERT(imp->imp_obd == obd);
3006 case IMP_EVENT_DISCON: {
3008 client_obd_list_lock(&cli->cl_loi_list_lock);
3009 cli->cl_avail_grant = 0;
3010 cli->cl_lost_grant = 0;
3011 client_obd_list_unlock(&cli->cl_loi_list_lock);
3014 case IMP_EVENT_INACTIVE: {
3015 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3018 case IMP_EVENT_INVALIDATE: {
3019 struct ldlm_namespace *ns = obd->obd_namespace;
3023 env = cl_env_get(&refcheck);
3027 /* all pages go to failing rpcs due to the invalid
3029 osc_io_unplug(env, cli, NULL);
3031 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3032 cl_env_put(env, &refcheck);
3037 case IMP_EVENT_ACTIVE: {
3038 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3041 case IMP_EVENT_OCD: {
3042 struct obd_connect_data *ocd = &imp->imp_connect_data;
3044 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3045 osc_init_grant(&obd->u.cli, ocd);
3048 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3049 imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
3051 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3054 case IMP_EVENT_DEACTIVATE: {
3055 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3058 case IMP_EVENT_ACTIVATE: {
3059 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3063 CERROR("Unknown import event %d\n", event);
3070 * Determine whether the lock can be canceled before replaying the lock
3071 * during recovery, see bug16774 for detailed information.
3073 * \retval zero the lock can't be canceled
3074 * \retval other ok to cancel
3076 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3078 check_res_locked(lock->l_resource);
3081 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3083 * XXX as a future improvement, we can also cancel unused write lock
3084 * if it doesn't have dirty data and active mmaps.
3086 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3087 (lock->l_granted_mode == LCK_PR ||
3088 lock->l_granted_mode == LCK_CR) &&
3089 (osc_dlm_lock_pageref(lock) == 0))
3095 static int brw_queue_work(const struct lu_env *env, void *data)
3097 struct client_obd *cli = data;
3099 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3101 osc_io_unplug(env, cli, NULL);
3105 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3107 struct lprocfs_static_vars lvars = { NULL };
3108 struct client_obd *cli = &obd->u.cli;
3115 rc = ptlrpcd_addref();
3119 rc = client_obd_setup(obd, lcfg);
3123 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3124 if (IS_ERR(handler)) {
3125 rc = PTR_ERR(handler);
3126 goto out_client_setup;
3128 cli->cl_writeback_work = handler;
3130 rc = osc_quota_setup(obd);
3132 goto out_ptlrpcd_work;
3134 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3135 lprocfs_osc_init_vars(&lvars);
3136 if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
3137 lproc_osc_attach_seqstat(obd);
3138 sptlrpc_lprocfs_cliobd_attach(obd);
3139 ptlrpc_lprocfs_register_obd(obd);
3143 * We try to control the total number of requests with a upper limit
3144 * osc_reqpool_maxreqcount. There might be some race which will cause
3145 * over-limit allocation, but it is fine.
3147 req_count = atomic_read(&osc_pool_req_count);
3148 if (req_count < osc_reqpool_maxreqcount) {
3149 adding = cli->cl_max_rpcs_in_flight + 2;
3150 if (req_count + adding > osc_reqpool_maxreqcount)
3151 adding = osc_reqpool_maxreqcount - req_count;
3153 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3154 atomic_add(added, &osc_pool_req_count);
3157 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3158 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3162 ptlrpcd_destroy_work(handler);
3164 client_obd_cleanup(obd);
3170 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3173 case OBD_CLEANUP_EARLY: {
3174 struct obd_import *imp;
3176 imp = obd->u.cli.cl_import;
3177 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3178 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3179 ptlrpc_deactivate_import(imp);
3180 spin_lock(&imp->imp_lock);
3181 imp->imp_pingable = 0;
3182 spin_unlock(&imp->imp_lock);
3185 case OBD_CLEANUP_EXPORTS: {
3186 struct client_obd *cli = &obd->u.cli;
3188 * for echo client, export may be on zombie list, wait for
3189 * zombie thread to cull it, because cli.cl_import will be
3190 * cleared in client_disconnect_export():
3191 * class_export_destroy() -> obd_cleanup() ->
3192 * echo_device_free() -> echo_client_cleanup() ->
3193 * obd_disconnect() -> osc_disconnect() ->
3194 * client_disconnect_export()
3196 obd_zombie_barrier();
3197 if (cli->cl_writeback_work) {
3198 ptlrpcd_destroy_work(cli->cl_writeback_work);
3199 cli->cl_writeback_work = NULL;
3201 obd_cleanup_client_import(obd);
3202 ptlrpc_lprocfs_unregister_obd(obd);
3203 lprocfs_obd_cleanup(obd);
3210 int osc_cleanup(struct obd_device *obd)
3212 struct client_obd *cli = &obd->u.cli;
3216 if (cli->cl_cache != NULL) {
3217 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3218 spin_lock(&cli->cl_cache->ccc_lru_lock);
3219 list_del_init(&cli->cl_lru_osc);
3220 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3221 cli->cl_lru_left = NULL;
3222 atomic_dec(&cli->cl_cache->ccc_users);
3223 cli->cl_cache = NULL;
3226 /* free memory of osc quota cache */
3227 osc_quota_cleanup(obd);
3229 rc = client_obd_cleanup(obd);
3235 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3237 struct lprocfs_static_vars lvars = { NULL };
3240 lprocfs_osc_init_vars(&lvars);
3242 switch (lcfg->lcfg_command) {
3244 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3254 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3256 return osc_process_config_base(obd, buf);
3259 struct obd_ops osc_obd_ops = {
3260 .owner = THIS_MODULE,
3262 .precleanup = osc_precleanup,
3263 .cleanup = osc_cleanup,
3264 .add_conn = client_import_add_conn,
3265 .del_conn = client_import_del_conn,
3266 .connect = client_connect_import,
3267 .reconnect = osc_reconnect,
3268 .disconnect = osc_disconnect,
3269 .statfs = osc_statfs,
3270 .statfs_async = osc_statfs_async,
3271 .packmd = osc_packmd,
3272 .unpackmd = osc_unpackmd,
3273 .create = osc_create,
3274 .destroy = osc_destroy,
3275 .getattr = osc_getattr,
3276 .getattr_async = osc_getattr_async,
3277 .setattr = osc_setattr,
3278 .setattr_async = osc_setattr_async,
3279 .find_cbdata = osc_find_cbdata,
3280 .iocontrol = osc_iocontrol,
3281 .get_info = osc_get_info,
3282 .set_info_async = osc_set_info_async,
3283 .import_event = osc_import_event,
3284 .process_config = osc_process_config,
3285 .quotactl = osc_quotactl,
3286 .quotacheck = osc_quotacheck,
3289 extern struct lu_kmem_descr osc_caches[];
3290 extern spinlock_t osc_ast_guard;
3291 extern struct lock_class_key osc_ast_guard_class;
3293 static int __init osc_init(void)
3295 struct lprocfs_static_vars lvars = { NULL };
3296 unsigned int reqpool_size;
3297 unsigned int reqsize;
3300 /* print an address of _any_ initialized kernel symbol from this
3301 * module, to allow debugging with gdb that doesn't support data
3302 * symbols from modules.*/
3303 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3305 rc = lu_kmem_init(osc_caches);
3309 lprocfs_osc_init_vars(&lvars);
3311 rc = class_register_type(&osc_obd_ops, NULL,
3312 LUSTRE_OSC_NAME, &osc_device_type);
3316 spin_lock_init(&osc_ast_guard);
3317 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3319 /* This is obviously too much memory, only prevent overflow here */
3320 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3325 reqpool_size = osc_reqpool_mem_max << 20;
3328 while (reqsize < OST_MAXREQSIZE)
3329 reqsize = reqsize << 1;
3332 * We don't enlarge the request count in OSC pool according to
3333 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3334 * tried after normal allocation failed. So a small OSC pool won't
3335 * cause much performance degression in most of cases.
3337 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3339 atomic_set(&osc_pool_req_count, 0);
3340 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3341 ptlrpc_add_rqs_to_pool);
3349 class_unregister_type(LUSTRE_OSC_NAME);
3351 lu_kmem_fini(osc_caches);
3355 static void /*__exit*/ osc_exit(void)
3357 class_unregister_type(LUSTRE_OSC_NAME);
3358 lu_kmem_fini(osc_caches);
3359 ptlrpc_free_rq_pool(osc_rq_pool);
3362 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3363 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3364 MODULE_LICENSE("GPL");
3365 MODULE_VERSION(LUSTRE_VERSION_STRING);
3367 module_init(osc_init);
3368 module_exit(osc_exit);