4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <linux/libcfs/libcfs.h>
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
53 #include <lustre_ha.h>
54 #include <lprocfs_status.h>
55 #include <lustre_log.h>
56 #include <lustre_debug.h>
57 #include <lustre_param.h>
58 #include <lustre_fid.h>
59 #include "osc_internal.h"
60 #include "osc_cl_internal.h"
62 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63 static int brw_interpret(const struct lu_env *env,
64 struct ptlrpc_request *req, void *data, int rc);
65 int osc_cleanup(struct obd_device *obd);
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69 struct lov_stripe_md *lsm)
74 lmm_size = sizeof(**lmmp);
78 if (*lmmp != NULL && lsm == NULL) {
79 OBD_FREE(*lmmp, lmm_size);
82 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
87 OBD_ALLOC(*lmmp, lmm_size);
93 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
98 /* Unpack OSC object metadata from disk storage (LE byte order). */
99 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
100 struct lov_mds_md *lmm, int lmm_bytes)
103 struct obd_import *imp = class_exp2cliimp(exp);
107 if (lmm_bytes < sizeof(*lmm)) {
108 CERROR("%s: lov_mds_md too small: %d, need %d\n",
109 exp->exp_obd->obd_name, lmm_bytes,
113 /* XXX LOV_MAGIC etc check? */
115 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
116 CERROR("%s: zero lmm_object_id: rc = %d\n",
117 exp->exp_obd->obd_name, -EINVAL);
122 lsm_size = lov_stripe_md_size(1);
126 if (*lsmp != NULL && lmm == NULL) {
127 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
128 OBD_FREE(*lsmp, lsm_size);
134 OBD_ALLOC(*lsmp, lsm_size);
135 if (unlikely(*lsmp == NULL))
137 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
139 OBD_FREE(*lsmp, lsm_size);
142 loi_init((*lsmp)->lsm_oinfo[0]);
143 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
148 /* XXX zero *lsmp? */
149 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
152 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
153 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
155 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160 static inline void osc_pack_capa(struct ptlrpc_request *req,
161 struct ost_body *body, void *capa)
163 struct obd_capa *oc = (struct obd_capa *)capa;
164 struct lustre_capa *c;
169 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
172 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173 DEBUG_CAPA(D_SEC, c, "pack");
176 static inline void osc_pack_req_body(struct ptlrpc_request *req,
177 struct obd_info *oinfo)
179 struct ost_body *body;
181 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
184 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
186 osc_pack_capa(req, body, oinfo->oi_capa);
189 static inline void osc_set_capa_size(struct ptlrpc_request *req,
190 const struct req_msg_field *field,
194 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
196 /* it is already calculated as sizeof struct obd_capa */
200 static int osc_getattr_interpret(const struct lu_env *env,
201 struct ptlrpc_request *req,
202 struct osc_async_args *aa, int rc)
204 struct ost_body *body;
210 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
212 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
213 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
214 aa->aa_oi->oi_oa, &body->oa);
216 /* This should really be sent by the OST */
217 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
218 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
220 CDEBUG(D_INFO, "can't unpack ost_body\n");
222 aa->aa_oi->oi_oa->o_valid = 0;
225 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
230 struct ptlrpc_request_set *set)
232 struct ptlrpc_request *req;
233 struct osc_async_args *aa;
237 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
242 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
244 ptlrpc_request_free(req);
248 osc_pack_req_body(req, oinfo);
250 ptlrpc_request_set_replen(req);
251 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
253 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
254 aa = ptlrpc_req_async_args(req);
257 ptlrpc_set_add_req(set, req);
261 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
262 struct obd_info *oinfo)
264 struct ptlrpc_request *req;
265 struct ost_body *body;
269 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
274 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276 ptlrpc_request_free(req);
280 osc_pack_req_body(req, oinfo);
282 ptlrpc_request_set_replen(req);
284 rc = ptlrpc_queue_wait(req);
288 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290 GOTO(out, rc = -EPROTO);
292 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
293 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
296 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301 ptlrpc_req_finished(req);
305 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306 struct obd_info *oinfo, struct obd_trans_info *oti)
308 struct ptlrpc_request *req;
309 struct ost_body *body;
313 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322 ptlrpc_request_free(req);
326 osc_pack_req_body(req, oinfo);
328 ptlrpc_request_set_replen(req);
330 rc = ptlrpc_queue_wait(req);
334 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336 GOTO(out, rc = -EPROTO);
338 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
343 ptlrpc_req_finished(req);
347 static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
351 struct ost_body *body;
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359 GOTO(out, rc = -EPROTO);
361 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
364 rc = sa->sa_upcall(sa->sa_cookie, rc);
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369 struct obd_trans_info *oti,
370 obd_enqueue_update_f upcall, void *cookie,
371 struct ptlrpc_request_set *rqset)
373 struct ptlrpc_request *req;
374 struct osc_setattr_args *sa;
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385 ptlrpc_request_free(req);
389 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
392 osc_pack_req_body(req, oinfo);
394 ptlrpc_request_set_replen(req);
396 /* do mds to ost setattr asynchronously */
398 /* Do not wait for response. */
399 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
401 req->rq_interpret_reply =
402 (ptlrpc_interpterer_t)osc_setattr_interpret;
404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405 sa = ptlrpc_req_async_args(req);
406 sa->sa_oa = oinfo->oi_oa;
407 sa->sa_upcall = upcall;
408 sa->sa_cookie = cookie;
410 if (rqset == PTLRPCD_SET)
411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413 ptlrpc_set_add_req(rqset, req);
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420 struct obd_trans_info *oti,
421 struct ptlrpc_request_set *rqset)
423 return osc_setattr_async_base(exp, oinfo, oti,
424 oinfo->oi_cb_up, oinfo, rqset);
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428 struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 struct ptlrpc_request *req;
431 struct ost_body *body;
432 struct lov_stripe_md *lsm;
441 rc = obd_alloc_memmd(exp, &lsm);
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
448 GOTO(out, rc = -ENOMEM);
450 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
452 ptlrpc_request_free(req);
456 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
459 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
461 ptlrpc_request_set_replen(req);
463 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464 oa->o_flags == OBD_FL_DELORPHAN) {
466 "delorphan from OST integration");
467 /* Don't resend the delorphan req */
468 req->rq_no_resend = req->rq_no_delay = 1;
471 rc = ptlrpc_queue_wait(req);
475 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
477 GOTO(out_req, rc = -EPROTO);
479 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
480 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
482 oa->o_blksize = cli_brw_size(exp->exp_obd);
483 oa->o_valid |= OBD_MD_FLBLKSZ;
485 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486 * have valid lsm_oinfo data structs, so don't go touching that.
487 * This needs to be fixed in a big way.
489 lsm->lsm_oi = oa->o_oi;
493 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
495 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496 if (!oti->oti_logcookies)
497 oti_alloc_cookies(oti, 1);
498 *oti->oti_logcookies = oa->o_lcookie;
502 CDEBUG(D_HA, "transno: "LPD64"\n",
503 lustre_msg_get_transno(req->rq_repmsg));
505 ptlrpc_req_finished(req);
508 obd_free_memmd(exp, &lsm);
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513 obd_enqueue_update_f upcall, void *cookie,
514 struct ptlrpc_request_set *rqset)
516 struct ptlrpc_request *req;
517 struct osc_setattr_args *sa;
518 struct ost_body *body;
522 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
529 ptlrpc_request_free(req);
532 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533 ptlrpc_at_set_req_timeout(req);
535 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
537 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
539 osc_pack_capa(req, body, oinfo->oi_capa);
541 ptlrpc_request_set_replen(req);
543 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545 sa = ptlrpc_req_async_args(req);
546 sa->sa_oa = oinfo->oi_oa;
547 sa->sa_upcall = upcall;
548 sa->sa_cookie = cookie;
549 if (rqset == PTLRPCD_SET)
550 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
552 ptlrpc_set_add_req(rqset, req);
557 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558 struct obd_info *oinfo, struct obd_trans_info *oti,
559 struct ptlrpc_request_set *rqset)
561 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
562 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564 return osc_punch_base(exp, oinfo,
565 oinfo->oi_cb_up, oinfo, rqset);
568 static int osc_sync_interpret(const struct lu_env *env,
569 struct ptlrpc_request *req,
572 struct osc_fsync_args *fa = arg;
573 struct ost_body *body;
579 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
581 CERROR ("can't unpack ost_body\n");
582 GOTO(out, rc = -EPROTO);
585 *fa->fa_oi->oi_oa = body->oa;
587 rc = fa->fa_upcall(fa->fa_cookie, rc);
591 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
592 obd_enqueue_update_f upcall, void *cookie,
593 struct ptlrpc_request_set *rqset)
595 struct ptlrpc_request *req;
596 struct ost_body *body;
597 struct osc_fsync_args *fa;
601 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
605 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
606 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
608 ptlrpc_request_free(req);
612 /* overload the size and blocks fields in the oa with start/end */
613 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
617 osc_pack_capa(req, body, oinfo->oi_capa);
619 ptlrpc_request_set_replen(req);
620 req->rq_interpret_reply = osc_sync_interpret;
622 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
623 fa = ptlrpc_req_async_args(req);
625 fa->fa_upcall = upcall;
626 fa->fa_cookie = cookie;
628 if (rqset == PTLRPCD_SET)
629 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
631 ptlrpc_set_add_req(rqset, req);
636 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
637 struct obd_info *oinfo, obd_size start, obd_size end,
638 struct ptlrpc_request_set *set)
643 CDEBUG(D_INFO, "oa NULL\n");
647 oinfo->oi_oa->o_size = start;
648 oinfo->oi_oa->o_blocks = end;
649 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
651 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
654 /* Find and cancel locally locks matched by @mode in the resource found by
655 * @objid. Found locks are added into @cancel list. Returns the amount of
656 * locks added to @cancels list. */
657 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
658 struct list_head *cancels,
659 ldlm_mode_t mode, int lock_flags)
661 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
662 struct ldlm_res_id res_id;
663 struct ldlm_resource *res;
667 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
668 * export) but disabled through procfs (flag in NS).
670 * This distinguishes from a case when ELC is not supported originally,
671 * when we still want to cancel locks in advance and just cancel them
672 * locally, without sending any RPC. */
673 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
676 ostid_build_res_name(&oa->o_oi, &res_id);
677 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681 LDLM_RESOURCE_ADDREF(res);
682 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
683 lock_flags, 0, NULL);
684 LDLM_RESOURCE_DELREF(res);
685 ldlm_resource_putref(res);
689 static int osc_destroy_interpret(const struct lu_env *env,
690 struct ptlrpc_request *req, void *data,
693 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
695 atomic_dec(&cli->cl_destroy_in_flight);
696 wake_up(&cli->cl_destroy_waitq);
700 static int osc_can_send_destroy(struct client_obd *cli)
702 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
703 cli->cl_max_rpcs_in_flight) {
704 /* The destroy request can be sent */
707 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
708 cli->cl_max_rpcs_in_flight) {
710 * The counter has been modified between the two atomic
713 wake_up(&cli->cl_destroy_waitq);
718 int osc_create(const struct lu_env *env, struct obd_export *exp,
719 struct obdo *oa, struct lov_stripe_md **ea,
720 struct obd_trans_info *oti)
727 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
729 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
730 oa->o_flags == OBD_FL_RECREATE_OBJS) {
731 RETURN(osc_real_create(exp, oa, ea, oti));
734 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
735 RETURN(osc_real_create(exp, oa, ea, oti));
737 /* we should not get here anymore */
743 /* Destroy requests can be async always on the client, and we don't even really
744 * care about the return code since the client cannot do anything at all about
746 * When the MDS is unlinking a filename, it saves the file objects into a
747 * recovery llog, and these object records are cancelled when the OST reports
748 * they were destroyed and sync'd to disk (i.e. transaction committed).
749 * If the client dies, or the OST is down when the object should be destroyed,
750 * the records are not cancelled, and when the OST reconnects to the MDS next,
751 * it will retrieve the llog unlink logs and then sends the log cancellation
752 * cookies to the MDS after committing destroy transactions. */
753 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
754 struct obdo *oa, struct lov_stripe_md *ea,
755 struct obd_trans_info *oti, struct obd_export *md_export,
758 struct client_obd *cli = &exp->exp_obd->u.cli;
759 struct ptlrpc_request *req;
760 struct ost_body *body;
766 CDEBUG(D_INFO, "oa NULL\n");
770 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771 LDLM_FL_DISCARD_DATA);
773 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
775 ldlm_lock_list_put(&cancels, l_bl_ast, count);
779 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
783 ptlrpc_request_free(req);
787 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788 ptlrpc_at_set_req_timeout(req);
790 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791 oa->o_lcookie = *oti->oti_logcookies;
792 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
794 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
796 osc_pack_capa(req, body, (struct obd_capa *)capa);
797 ptlrpc_request_set_replen(req);
799 /* If osc_destory is for destroying the unlink orphan,
800 * sent from MDT to OST, which should not be blocked here,
801 * because the process might be triggered by ptlrpcd, and
802 * it is not good to block ptlrpcd thread (b=16006)*/
803 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804 req->rq_interpret_reply = osc_destroy_interpret;
805 if (!osc_can_send_destroy(cli)) {
806 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
810 * Wait until the number of on-going destroy RPCs drops
811 * under max_rpc_in_flight
813 l_wait_event_exclusive(cli->cl_destroy_waitq,
814 osc_can_send_destroy(cli), &lwi);
818 /* Do not wait for response */
819 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
823 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
826 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
828 LASSERT(!(oa->o_valid & bits));
831 client_obd_list_lock(&cli->cl_loi_list_lock);
832 oa->o_dirty = cli->cl_dirty;
833 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834 cli->cl_dirty_max)) {
835 CERROR("dirty %lu - %lu > dirty_max %lu\n",
836 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
838 } else if (unlikely(atomic_read(&obd_dirty_pages) -
839 atomic_read(&obd_dirty_transit_pages) >
840 (long)(obd_max_dirty_pages + 1))) {
841 /* The atomic_read() allowing the atomic_inc() are
842 * not covered by a lock thus they may safely race and trip
843 * this CERROR() unless we add in a small fudge factor (+1). */
844 CERROR("dirty %d - %d > system dirty_max %d\n",
845 atomic_read(&obd_dirty_pages),
846 atomic_read(&obd_dirty_transit_pages),
847 obd_max_dirty_pages);
849 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850 CERROR("dirty %lu - dirty_max %lu too big???\n",
851 cli->cl_dirty, cli->cl_dirty_max);
854 long max_in_flight = (cli->cl_max_pages_per_rpc <<
856 (cli->cl_max_rpcs_in_flight + 1);
857 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
859 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860 oa->o_dropped = cli->cl_lost_grant;
861 cli->cl_lost_grant = 0;
862 client_obd_list_unlock(&cli->cl_loi_list_lock);
863 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
864 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
868 void osc_update_next_shrink(struct client_obd *cli)
870 cli->cl_next_shrink_grant =
871 cfs_time_shift(cli->cl_grant_shrink_interval);
872 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873 cli->cl_next_shrink_grant);
876 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
878 client_obd_list_lock(&cli->cl_loi_list_lock);
879 cli->cl_avail_grant += grant;
880 client_obd_list_unlock(&cli->cl_loi_list_lock);
883 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
885 if (body->oa.o_valid & OBD_MD_FLGRANT) {
886 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
887 __osc_update_grant(cli, body->oa.o_grant);
891 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892 obd_count keylen, void *key, obd_count vallen,
893 void *val, struct ptlrpc_request_set *set);
895 static int osc_shrink_grant_interpret(const struct lu_env *env,
896 struct ptlrpc_request *req,
899 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
901 struct ost_body *body;
904 __osc_update_grant(cli, oa->o_grant);
908 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
910 osc_update_grant(cli, body);
916 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
918 client_obd_list_lock(&cli->cl_loi_list_lock);
919 oa->o_grant = cli->cl_avail_grant / 4;
920 cli->cl_avail_grant -= oa->o_grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
922 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923 oa->o_valid |= OBD_MD_FLFLAGS;
926 oa->o_flags |= OBD_FL_SHRINK_GRANT;
927 osc_update_next_shrink(cli);
930 /* Shrink the current grant, either from some large amount to enough for a
931 * full set of in-flight RPCs, or if we have already shrunk to that limit
932 * then to enough for a single RPC. This avoids keeping more grant than
933 * needed, and avoids shrinking the grant piecemeal. */
934 static int osc_shrink_grant(struct client_obd *cli)
936 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 if (cli->cl_avail_grant <= target_bytes)
941 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
944 return osc_shrink_grant_to_target(cli, target_bytes);
947 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
950 struct ost_body *body;
953 client_obd_list_lock(&cli->cl_loi_list_lock);
954 /* Don't shrink if we are already above or below the desired limit
955 * We don't want to shrink below a single RPC, as that will negatively
956 * impact block allocation and long-term performance. */
957 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
958 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
960 if (target_bytes >= cli->cl_avail_grant) {
961 client_obd_list_unlock(&cli->cl_loi_list_lock);
964 client_obd_list_unlock(&cli->cl_loi_list_lock);
970 osc_announce_cached(cli, &body->oa, 0);
972 client_obd_list_lock(&cli->cl_loi_list_lock);
973 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
974 cli->cl_avail_grant = target_bytes;
975 client_obd_list_unlock(&cli->cl_loi_list_lock);
976 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
977 body->oa.o_valid |= OBD_MD_FLFLAGS;
978 body->oa.o_flags = 0;
980 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
981 osc_update_next_shrink(cli);
983 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
984 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
985 sizeof(*body), body, NULL);
987 __osc_update_grant(cli, body->oa.o_grant);
992 static int osc_should_shrink_grant(struct client_obd *client)
994 cfs_time_t time = cfs_time_current();
995 cfs_time_t next_shrink = client->cl_next_shrink_grant;
997 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
998 OBD_CONNECT_GRANT_SHRINK) == 0)
1001 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1002 /* Get the current RPC size directly, instead of going via:
1003 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1004 * Keep comment here so that it can be found by searching. */
1005 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1007 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1008 client->cl_avail_grant > brw_size)
1011 osc_update_next_shrink(client);
1016 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1018 struct client_obd *client;
1020 list_for_each_entry(client, &item->ti_obd_list,
1021 cl_grant_shrink_list) {
1022 if (osc_should_shrink_grant(client))
1023 osc_shrink_grant(client);
1028 static int osc_add_shrink_grant(struct client_obd *client)
1032 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1034 osc_grant_shrink_grant_cb, NULL,
1035 &client->cl_grant_shrink_list);
1037 CERROR("add grant client %s error %d\n",
1038 client->cl_import->imp_obd->obd_name, rc);
1041 CDEBUG(D_CACHE, "add grant client %s \n",
1042 client->cl_import->imp_obd->obd_name);
1043 osc_update_next_shrink(client);
1047 static int osc_del_shrink_grant(struct client_obd *client)
1049 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1053 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1056 * ocd_grant is the total grant amount we're expect to hold: if we've
1057 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1058 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1060 * race is tolerable here: if we're evicted, but imp_state already
1061 * left EVICTED state, then cl_dirty must be 0 already.
1063 client_obd_list_lock(&cli->cl_loi_list_lock);
1064 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1065 cli->cl_avail_grant = ocd->ocd_grant;
1067 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1069 if (cli->cl_avail_grant < 0) {
1070 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1071 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1072 ocd->ocd_grant, cli->cl_dirty);
1073 /* workaround for servers which do not have the patch from
1075 cli->cl_avail_grant = ocd->ocd_grant;
1078 /* determine the appropriate chunk size used by osc_extent. */
1079 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1080 client_obd_list_unlock(&cli->cl_loi_list_lock);
1082 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1083 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1084 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1086 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1087 list_empty(&cli->cl_grant_shrink_list))
1088 osc_add_shrink_grant(cli);
1091 /* We assume that the reason this OSC got a short read is because it read
1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094 * this stripe never got written at or beyond this stripe offset yet. */
1095 static void handle_short_read(int nob_read, obd_count page_count,
1096 struct brw_page **pga)
1101 /* skip bytes read OK */
1102 while (nob_read > 0) {
1103 LASSERT (page_count > 0);
1105 if (pga[i]->count > nob_read) {
1106 /* EOF inside this page */
1107 ptr = kmap(pga[i]->pg) +
1108 (pga[i]->off & ~CFS_PAGE_MASK);
1109 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1116 nob_read -= pga[i]->count;
1121 /* zero remaining pages */
1122 while (page_count-- > 0) {
1123 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124 memset(ptr, 0, pga[i]->count);
1130 static int check_write_rcs(struct ptlrpc_request *req,
1131 int requested_nob, int niocount,
1132 obd_count page_count, struct brw_page **pga)
1137 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1138 sizeof(*remote_rcs) *
1140 if (remote_rcs == NULL) {
1141 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1145 /* return error if any niobuf was in error */
1146 for (i = 0; i < niocount; i++) {
1147 if ((int)remote_rcs[i] < 0)
1148 return(remote_rcs[i]);
1150 if (remote_rcs[i] != 0) {
1151 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1152 i, remote_rcs[i], req);
1157 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1158 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1159 req->rq_bulk->bd_nob_transferred, requested_nob);
1166 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1168 if (p1->flag != p2->flag) {
1169 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1170 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1172 /* warn if we try to combine flags that we don't know to be
1173 * safe to combine */
1174 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1175 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1176 "report this at http://bugs.whamcloud.com/\n",
1177 p1->flag, p2->flag);
1182 return (p1->off + p1->count == p2->off);
1185 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1186 struct brw_page **pga, int opc,
1187 cksum_type_t cksum_type)
1191 struct cfs_crypto_hash_desc *hdesc;
1192 unsigned int bufsize;
1194 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1196 LASSERT(pg_count > 0);
1198 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1199 if (IS_ERR(hdesc)) {
1200 CERROR("Unable to initialize checksum hash %s\n",
1201 cfs_crypto_hash_name(cfs_alg));
1202 return PTR_ERR(hdesc);
1205 while (nob > 0 && pg_count > 0) {
1206 int count = pga[i]->count > nob ? nob : pga[i]->count;
1208 /* corrupt the data before we compute the checksum, to
1209 * simulate an OST->client data error */
1210 if (i == 0 && opc == OST_READ &&
1211 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1212 unsigned char *ptr = kmap(pga[i]->pg);
1213 int off = pga[i]->off & ~CFS_PAGE_MASK;
1214 memcpy(ptr + off, "bad1", min(4, nob));
1217 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1218 pga[i]->off & ~CFS_PAGE_MASK,
1220 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1221 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1223 nob -= pga[i]->count;
1229 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1232 cfs_crypto_hash_final(hdesc, NULL, NULL);
1234 /* For sending we only compute the wrong checksum instead
1235 * of corrupting the data so it is still correct on a redo */
1236 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1242 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1243 struct lov_stripe_md *lsm, obd_count page_count,
1244 struct brw_page **pga,
1245 struct ptlrpc_request **reqp,
1246 struct obd_capa *ocapa, int reserve,
1249 struct ptlrpc_request *req;
1250 struct ptlrpc_bulk_desc *desc;
1251 struct ost_body *body;
1252 struct obd_ioobj *ioobj;
1253 struct niobuf_remote *niobuf;
1254 int niocount, i, requested_nob, opc, rc;
1255 struct osc_brw_async_args *aa;
1256 struct req_capsule *pill;
1257 struct brw_page *pg_prev;
1260 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1261 RETURN(-ENOMEM); /* Recoverable */
1262 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1263 RETURN(-EINVAL); /* Fatal */
1265 if ((cmd & OBD_BRW_WRITE) != 0) {
1267 req = ptlrpc_request_alloc_pool(cli->cl_import,
1268 cli->cl_import->imp_rq_pool,
1269 &RQF_OST_BRW_WRITE);
1272 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1277 for (niocount = i = 1; i < page_count; i++) {
1278 if (!can_merge_pages(pga[i - 1], pga[i]))
1282 pill = &req->rq_pill;
1283 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1285 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1286 niocount * sizeof(*niobuf));
1287 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1289 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1291 ptlrpc_request_free(req);
1294 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1295 ptlrpc_at_set_req_timeout(req);
1296 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1298 req->rq_no_retry_einprogress = 1;
1300 desc = ptlrpc_prep_bulk_imp(req, page_count,
1301 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1302 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1306 GOTO(out, rc = -ENOMEM);
1307 /* NB request now owns desc and will free it when it gets freed */
1309 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1310 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1311 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1314 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1316 obdo_to_ioobj(oa, ioobj);
1317 ioobj->ioo_bufcnt = niocount;
1318 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1319 * that might be send for this request. The actual number is decided
1320 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1321 * "max - 1" for old client compatibility sending "0", and also so the
1322 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1323 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1324 osc_pack_capa(req, body, ocapa);
1325 LASSERT(page_count > 0);
1327 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1328 struct brw_page *pg = pga[i];
1329 int poff = pg->off & ~CFS_PAGE_MASK;
1331 LASSERT(pg->count > 0);
1332 /* make sure there is no gap in the middle of page array */
1333 LASSERTF(page_count == 1 ||
1334 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1335 ergo(i > 0 && i < page_count - 1,
1336 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1337 ergo(i == page_count - 1, poff == 0)),
1338 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1339 i, page_count, pg, pg->off, pg->count);
1340 LASSERTF(i == 0 || pg->off > pg_prev->off,
1341 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1342 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1344 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1345 pg_prev->pg, page_private(pg_prev->pg),
1346 pg_prev->pg->index, pg_prev->off);
1347 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1348 (pg->flag & OBD_BRW_SRVLOCK));
1350 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1351 requested_nob += pg->count;
1353 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1355 niobuf->len += pg->count;
1357 niobuf->offset = pg->off;
1358 niobuf->len = pg->count;
1359 niobuf->flags = pg->flag;
1364 LASSERTF((void *)(niobuf - niocount) ==
1365 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1366 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1367 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1369 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1371 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1372 body->oa.o_valid |= OBD_MD_FLFLAGS;
1373 body->oa.o_flags = 0;
1375 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1378 if (osc_should_shrink_grant(cli))
1379 osc_shrink_grant_local(cli, &body->oa);
1381 /* size[REQ_REC_OFF] still sizeof (*body) */
1382 if (opc == OST_WRITE) {
1383 if (cli->cl_checksum &&
1384 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1385 /* store cl_cksum_type in a local variable since
1386 * it can be changed via lprocfs */
1387 cksum_type_t cksum_type = cli->cl_cksum_type;
1389 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1390 oa->o_flags &= OBD_FL_LOCAL_MASK;
1391 body->oa.o_flags = 0;
1393 body->oa.o_flags |= cksum_type_pack(cksum_type);
1394 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1399 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1401 /* save this in 'oa', too, for later checking */
1402 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403 oa->o_flags |= cksum_type_pack(cksum_type);
1405 /* clear out the checksum flag, in case this is a
1406 * resend but cl_checksum is no longer set. b=11238 */
1407 oa->o_valid &= ~OBD_MD_FLCKSUM;
1409 oa->o_cksum = body->oa.o_cksum;
1410 /* 1 RC per niobuf */
1411 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1412 sizeof(__u32) * niocount);
1414 if (cli->cl_checksum &&
1415 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1416 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1417 body->oa.o_flags = 0;
1418 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1419 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1422 ptlrpc_request_set_replen(req);
1424 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1425 aa = ptlrpc_req_async_args(req);
1427 aa->aa_requested_nob = requested_nob;
1428 aa->aa_nio_count = niocount;
1429 aa->aa_page_count = page_count;
1433 INIT_LIST_HEAD(&aa->aa_oaps);
1434 if (ocapa && reserve)
1435 aa->aa_ocapa = capa_get(ocapa);
1441 ptlrpc_req_finished(req);
1445 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1446 __u32 client_cksum, __u32 server_cksum, int nob,
1447 obd_count page_count, struct brw_page **pga,
1448 cksum_type_t client_cksum_type)
1452 cksum_type_t cksum_type;
1454 if (server_cksum == client_cksum) {
1455 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1459 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1461 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1464 if (cksum_type != client_cksum_type)
1465 msg = "the server did not use the checksum type specified in "
1466 "the original request - likely a protocol problem";
1467 else if (new_cksum == server_cksum)
1468 msg = "changed on the client after we checksummed it - "
1469 "likely false positive due to mmap IO (bug 11742)";
1470 else if (new_cksum == client_cksum)
1471 msg = "changed in transit before arrival at OST";
1473 msg = "changed in transit AND doesn't match the original - "
1474 "likely false positive due to mmap IO (bug 11742)";
1476 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1477 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1478 msg, libcfs_nid2str(peer->nid),
1479 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1480 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1481 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1482 POSTID(&oa->o_oi), pga[0]->off,
1483 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1484 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1485 "client csum now %x\n", client_cksum, client_cksum_type,
1486 server_cksum, cksum_type, new_cksum);
1490 /* Note rc enters this function as number of bytes transferred */
1491 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1493 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1494 const lnet_process_id_t *peer =
1495 &req->rq_import->imp_connection->c_peer;
1496 struct client_obd *cli = aa->aa_cli;
1497 struct ost_body *body;
1498 __u32 client_cksum = 0;
1501 if (rc < 0 && rc != -EDQUOT) {
1502 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1506 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1509 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1513 /* set/clear over quota flag for a uid/gid */
1514 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1515 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1516 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1518 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1519 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1521 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1524 osc_update_grant(cli, body);
1529 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1530 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1532 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1534 CERROR("Unexpected +ve rc %d\n", rc);
1537 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1539 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1542 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1543 check_write_checksum(&body->oa, peer, client_cksum,
1544 body->oa.o_cksum, aa->aa_requested_nob,
1545 aa->aa_page_count, aa->aa_ppga,
1546 cksum_type_unpack(aa->aa_oa->o_flags)))
1549 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1550 aa->aa_page_count, aa->aa_ppga);
1554 /* The rest of this function executes only for OST_READs */
1556 /* if unwrap_bulk failed, return -EAGAIN to retry */
1557 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1559 GOTO(out, rc = -EAGAIN);
1561 if (rc > aa->aa_requested_nob) {
1562 CERROR("Unexpected rc %d (%d requested)\n", rc,
1563 aa->aa_requested_nob);
1567 if (rc != req->rq_bulk->bd_nob_transferred) {
1568 CERROR ("Unexpected rc %d (%d transferred)\n",
1569 rc, req->rq_bulk->bd_nob_transferred);
1573 if (rc < aa->aa_requested_nob)
1574 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1576 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1577 static int cksum_counter;
1578 __u32 server_cksum = body->oa.o_cksum;
1581 cksum_type_t cksum_type;
1583 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1584 body->oa.o_flags : 0);
1585 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1586 aa->aa_ppga, OST_READ,
1589 if (peer->nid == req->rq_bulk->bd_sender) {
1593 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1596 if (server_cksum == ~0 && rc > 0) {
1597 CERROR("Protocol error: server %s set the 'checksum' "
1598 "bit, but didn't send a checksum. Not fatal, "
1599 "but please notify on http://bugs.whamcloud.com/\n",
1600 libcfs_nid2str(peer->nid));
1601 } else if (server_cksum != client_cksum) {
1602 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1603 "%s%s%s inode "DFID" object "DOSTID
1604 " extent ["LPU64"-"LPU64"]\n",
1605 req->rq_import->imp_obd->obd_name,
1606 libcfs_nid2str(peer->nid),
1608 body->oa.o_valid & OBD_MD_FLFID ?
1609 body->oa.o_parent_seq : (__u64)0,
1610 body->oa.o_valid & OBD_MD_FLFID ?
1611 body->oa.o_parent_oid : 0,
1612 body->oa.o_valid & OBD_MD_FLFID ?
1613 body->oa.o_parent_ver : 0,
1614 POSTID(&body->oa.o_oi),
1615 aa->aa_ppga[0]->off,
1616 aa->aa_ppga[aa->aa_page_count-1]->off +
1617 aa->aa_ppga[aa->aa_page_count-1]->count -
1619 CERROR("client %x, server %x, cksum_type %x\n",
1620 client_cksum, server_cksum, cksum_type);
1622 aa->aa_oa->o_cksum = client_cksum;
1626 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1629 } else if (unlikely(client_cksum)) {
1630 static int cksum_missed;
1633 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1634 CERROR("Checksum %u requested from %s but not sent\n",
1635 cksum_missed, libcfs_nid2str(peer->nid));
1641 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1642 aa->aa_oa, &body->oa);
1647 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1648 struct lov_stripe_md *lsm,
1649 obd_count page_count, struct brw_page **pga,
1650 struct obd_capa *ocapa)
1652 struct ptlrpc_request *req;
1654 wait_queue_head_t waitq;
1655 int generation, resends = 0;
1656 struct l_wait_info lwi;
1660 init_waitqueue_head(&waitq);
1661 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1664 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1665 page_count, pga, &req, ocapa, 0, resends);
1670 req->rq_generation_set = 1;
1671 req->rq_import_generation = generation;
1672 req->rq_sent = cfs_time_current_sec() + resends;
1675 rc = ptlrpc_queue_wait(req);
1677 if (rc == -ETIMEDOUT && req->rq_resend) {
1678 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1679 ptlrpc_req_finished(req);
1683 rc = osc_brw_fini_request(req, rc);
1685 ptlrpc_req_finished(req);
1686 /* When server return -EINPROGRESS, client should always retry
1687 * regardless of the number of times the bulk was resent already.*/
1688 if (osc_recoverable_error(rc)) {
1690 if (rc != -EINPROGRESS &&
1691 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1692 CERROR("%s: too many resend retries for object: "
1693 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1694 POSTID(&oa->o_oi), rc);
1698 exp->exp_obd->u.cli.cl_import->imp_generation) {
1699 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1700 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1701 POSTID(&oa->o_oi), rc);
1705 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1707 l_wait_event(waitq, 0, &lwi);
1712 if (rc == -EAGAIN || rc == -EINPROGRESS)
1717 static int osc_brw_redo_request(struct ptlrpc_request *request,
1718 struct osc_brw_async_args *aa, int rc)
1720 struct ptlrpc_request *new_req;
1721 struct osc_brw_async_args *new_aa;
1722 struct osc_async_page *oap;
1725 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1726 "redo for recoverable error %d", rc);
1728 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1729 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1730 aa->aa_cli, aa->aa_oa,
1731 NULL /* lsm unused by osc currently */,
1732 aa->aa_page_count, aa->aa_ppga,
1733 &new_req, aa->aa_ocapa, 0, 1);
1737 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1738 if (oap->oap_request != NULL) {
1739 LASSERTF(request == oap->oap_request,
1740 "request %p != oap_request %p\n",
1741 request, oap->oap_request);
1742 if (oap->oap_interrupted) {
1743 ptlrpc_req_finished(new_req);
1748 /* New request takes over pga and oaps from old request.
1749 * Note that copying a list_head doesn't work, need to move it... */
1751 new_req->rq_interpret_reply = request->rq_interpret_reply;
1752 new_req->rq_async_args = request->rq_async_args;
1753 /* cap resend delay to the current request timeout, this is similar to
1754 * what ptlrpc does (see after_reply()) */
1755 if (aa->aa_resends > new_req->rq_timeout)
1756 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1758 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1759 new_req->rq_generation_set = 1;
1760 new_req->rq_import_generation = request->rq_import_generation;
1762 new_aa = ptlrpc_req_async_args(new_req);
1764 INIT_LIST_HEAD(&new_aa->aa_oaps);
1765 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1766 INIT_LIST_HEAD(&new_aa->aa_exts);
1767 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1768 new_aa->aa_resends = aa->aa_resends;
1770 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1771 if (oap->oap_request) {
1772 ptlrpc_req_finished(oap->oap_request);
1773 oap->oap_request = ptlrpc_request_addref(new_req);
1777 new_aa->aa_ocapa = aa->aa_ocapa;
1778 aa->aa_ocapa = NULL;
1780 /* XXX: This code will run into problem if we're going to support
1781 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1782 * and wait for all of them to be finished. We should inherit request
1783 * set from old request. */
1784 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1786 DEBUG_REQ(D_INFO, new_req, "new request");
1791 * ugh, we want disk allocation on the target to happen in offset order. we'll
1792 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1793 * fine for our small page arrays and doesn't require allocation. its an
1794 * insertion sort that swaps elements that are strides apart, shrinking the
1795 * stride down until its '1' and the array is sorted.
1797 static void sort_brw_pages(struct brw_page **array, int num)
1800 struct brw_page *tmp;
1804 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1809 for (i = stride ; i < num ; i++) {
1812 while (j >= stride && array[j - stride]->off > tmp->off) {
1813 array[j] = array[j - stride];
1818 } while (stride > 1);
1821 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1827 LASSERT (pages > 0);
1828 offset = pg[i]->off & ~CFS_PAGE_MASK;
1832 if (pages == 0) /* that's all */
1835 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1836 return count; /* doesn't end on page boundary */
1839 offset = pg[i]->off & ~CFS_PAGE_MASK;
1840 if (offset != 0) /* doesn't start on page boundary */
1847 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1849 struct brw_page **ppga;
1852 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1856 for (i = 0; i < count; i++)
1861 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1863 LASSERT(ppga != NULL);
1864 OBD_FREE(ppga, sizeof(*ppga) * count);
1867 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1868 obd_count page_count, struct brw_page *pga,
1869 struct obd_trans_info *oti)
1871 struct obdo *saved_oa = NULL;
1872 struct brw_page **ppga, **orig;
1873 struct obd_import *imp = class_exp2cliimp(exp);
1874 struct client_obd *cli;
1875 int rc, page_count_orig;
1878 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1879 cli = &imp->imp_obd->u.cli;
1881 if (cmd & OBD_BRW_CHECK) {
1882 /* The caller just wants to know if there's a chance that this
1883 * I/O can succeed */
1885 if (imp->imp_invalid)
1890 /* test_brw with a failed create can trip this, maybe others. */
1891 LASSERT(cli->cl_max_pages_per_rpc);
1895 orig = ppga = osc_build_ppga(pga, page_count);
1898 page_count_orig = page_count;
1900 sort_brw_pages(ppga, page_count);
1901 while (page_count) {
1902 obd_count pages_per_brw;
1904 if (page_count > cli->cl_max_pages_per_rpc)
1905 pages_per_brw = cli->cl_max_pages_per_rpc;
1907 pages_per_brw = page_count;
1909 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1911 if (saved_oa != NULL) {
1912 /* restore previously saved oa */
1913 *oinfo->oi_oa = *saved_oa;
1914 } else if (page_count > pages_per_brw) {
1915 /* save a copy of oa (brw will clobber it) */
1916 OBDO_ALLOC(saved_oa);
1917 if (saved_oa == NULL)
1918 GOTO(out, rc = -ENOMEM);
1919 *saved_oa = *oinfo->oi_oa;
1922 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1923 pages_per_brw, ppga, oinfo->oi_capa);
1928 page_count -= pages_per_brw;
1929 ppga += pages_per_brw;
1933 osc_release_ppga(orig, page_count_orig);
1935 if (saved_oa != NULL)
1936 OBDO_FREE(saved_oa);
1941 static int brw_interpret(const struct lu_env *env,
1942 struct ptlrpc_request *req, void *data, int rc)
1944 struct osc_brw_async_args *aa = data;
1945 struct osc_extent *ext;
1946 struct osc_extent *tmp;
1947 struct cl_object *obj = NULL;
1948 struct client_obd *cli = aa->aa_cli;
1951 rc = osc_brw_fini_request(req, rc);
1952 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1953 /* When server return -EINPROGRESS, client should always retry
1954 * regardless of the number of times the bulk was resent already. */
1955 if (osc_recoverable_error(rc)) {
1956 if (req->rq_import_generation !=
1957 req->rq_import->imp_generation) {
1958 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1959 ""DOSTID", rc = %d.\n",
1960 req->rq_import->imp_obd->obd_name,
1961 POSTID(&aa->aa_oa->o_oi), rc);
1962 } else if (rc == -EINPROGRESS ||
1963 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1964 rc = osc_brw_redo_request(req, aa, rc);
1966 CERROR("%s: too many resent retries for object: "
1967 ""LPU64":"LPU64", rc = %d.\n",
1968 req->rq_import->imp_obd->obd_name,
1969 POSTID(&aa->aa_oa->o_oi), rc);
1974 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1979 capa_put(aa->aa_ocapa);
1980 aa->aa_ocapa = NULL;
1983 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1984 if (obj == NULL && rc == 0) {
1985 obj = osc2cl(ext->oe_obj);
1989 list_del_init(&ext->oe_link);
1990 osc_extent_finish(env, ext, 1, rc);
1992 LASSERT(list_empty(&aa->aa_exts));
1993 LASSERT(list_empty(&aa->aa_oaps));
1996 struct obdo *oa = aa->aa_oa;
1997 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1998 unsigned long valid = 0;
2001 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2002 attr->cat_blocks = oa->o_blocks;
2003 valid |= CAT_BLOCKS;
2005 if (oa->o_valid & OBD_MD_FLMTIME) {
2006 attr->cat_mtime = oa->o_mtime;
2009 if (oa->o_valid & OBD_MD_FLATIME) {
2010 attr->cat_atime = oa->o_atime;
2013 if (oa->o_valid & OBD_MD_FLCTIME) {
2014 attr->cat_ctime = oa->o_ctime;
2018 cl_object_attr_lock(obj);
2019 cl_object_attr_set(env, obj, attr, valid);
2020 cl_object_attr_unlock(obj);
2022 cl_object_put(env, obj);
2024 OBDO_FREE(aa->aa_oa);
2026 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2027 req->rq_bulk->bd_nob_transferred);
2028 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2029 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2031 client_obd_list_lock(&cli->cl_loi_list_lock);
2032 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2033 * is called so we know whether to go to sync BRWs or wait for more
2034 * RPCs to complete */
2035 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2036 cli->cl_w_in_flight--;
2038 cli->cl_r_in_flight--;
2039 osc_wake_cache_waiters(cli);
2040 client_obd_list_unlock(&cli->cl_loi_list_lock);
2042 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2047 * Build an RPC by the list of extent @ext_list. The caller must ensure
2048 * that the total pages in this list are NOT over max pages per RPC.
2049 * Extents in the list must be in OES_RPC state.
2051 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2052 struct list_head *ext_list, int cmd, pdl_policy_t pol)
2054 struct ptlrpc_request *req = NULL;
2055 struct osc_extent *ext;
2056 struct brw_page **pga = NULL;
2057 struct osc_brw_async_args *aa = NULL;
2058 struct obdo *oa = NULL;
2059 struct osc_async_page *oap;
2060 struct osc_async_page *tmp;
2061 struct cl_req *clerq = NULL;
2062 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2064 struct ldlm_lock *lock = NULL;
2065 struct cl_req_attr *crattr = NULL;
2066 obd_off starting_offset = OBD_OBJECT_EOF;
2067 obd_off ending_offset = 0;
2073 LIST_HEAD(rpc_list);
2076 LASSERT(!list_empty(ext_list));
2078 /* add pages into rpc_list to build BRW rpc */
2079 list_for_each_entry(ext, ext_list, oe_link) {
2080 LASSERT(ext->oe_state == OES_RPC);
2081 mem_tight |= ext->oe_memalloc;
2082 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2084 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2085 if (starting_offset > oap->oap_obj_off)
2086 starting_offset = oap->oap_obj_off;
2088 LASSERT(oap->oap_page_off == 0);
2089 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2090 ending_offset = oap->oap_obj_off +
2093 LASSERT(oap->oap_page_off + oap->oap_count ==
2099 mpflag = cfs_memory_pressure_get_and_set();
2101 OBD_ALLOC(crattr, sizeof(*crattr));
2103 GOTO(out, rc = -ENOMEM);
2105 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2107 GOTO(out, rc = -ENOMEM);
2111 GOTO(out, rc = -ENOMEM);
2114 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2115 struct cl_page *page = oap2cl_page(oap);
2116 if (clerq == NULL) {
2117 clerq = cl_req_alloc(env, page, crt,
2118 1 /* only 1-object rpcs for now */);
2120 GOTO(out, rc = PTR_ERR(clerq));
2121 lock = oap->oap_ldlm_lock;
2124 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2125 pga[i] = &oap->oap_brw_page;
2126 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2127 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2128 pga[i]->pg, page_index(oap->oap_page), oap,
2131 cl_req_page_add(env, clerq, page);
2134 /* always get the data for the obdo for the rpc */
2135 LASSERT(clerq != NULL);
2136 crattr->cra_oa = oa;
2137 cl_req_attr_set(env, clerq, crattr, ~0ULL);
2139 oa->o_handle = lock->l_remote_handle;
2140 oa->o_valid |= OBD_MD_FLHANDLE;
2143 rc = cl_req_prep(env, clerq);
2145 CERROR("cl_req_prep failed: %d\n", rc);
2149 sort_brw_pages(pga, page_count);
2150 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2151 pga, &req, crattr->cra_capa, 1, 0);
2153 CERROR("prep_req failed: %d\n", rc);
2157 req->rq_interpret_reply = brw_interpret;
2160 req->rq_memalloc = 1;
2162 /* Need to update the timestamps after the request is built in case
2163 * we race with setattr (locally or in queue at OST). If OST gets
2164 * later setattr before earlier BRW (as determined by the request xid),
2165 * the OST will not use BRW timestamps. Sadly, there is no obvious
2166 * way to do this in a single call. bug 10150 */
2167 cl_req_attr_set(env, clerq, crattr,
2168 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2170 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2172 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2173 aa = ptlrpc_req_async_args(req);
2174 INIT_LIST_HEAD(&aa->aa_oaps);
2175 list_splice_init(&rpc_list, &aa->aa_oaps);
2176 INIT_LIST_HEAD(&aa->aa_exts);
2177 list_splice_init(ext_list, &aa->aa_exts);
2178 aa->aa_clerq = clerq;
2180 /* queued sync pages can be torn down while the pages
2181 * were between the pending list and the rpc */
2183 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2184 /* only one oap gets a request reference */
2187 if (oap->oap_interrupted && !req->rq_intr) {
2188 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2190 ptlrpc_mark_interrupted(req);
2194 tmp->oap_request = ptlrpc_request_addref(req);
2196 client_obd_list_lock(&cli->cl_loi_list_lock);
2197 starting_offset >>= PAGE_CACHE_SHIFT;
2198 if (cmd == OBD_BRW_READ) {
2199 cli->cl_r_in_flight++;
2200 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2201 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2202 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2203 starting_offset + 1);
2205 cli->cl_w_in_flight++;
2206 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2207 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2208 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2209 starting_offset + 1);
2211 client_obd_list_unlock(&cli->cl_loi_list_lock);
2213 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2214 page_count, aa, cli->cl_r_in_flight,
2215 cli->cl_w_in_flight);
2217 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2218 * see which CPU/NUMA node the majority of pages were allocated
2219 * on, and try to assign the async RPC to the CPU core
2220 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2222 * But on the other hand, we expect that multiple ptlrpcd
2223 * threads and the initial write sponsor can run in parallel,
2224 * especially when data checksum is enabled, which is CPU-bound
2225 * operation and single ptlrpcd thread cannot process in time.
2226 * So more ptlrpcd threads sharing BRW load
2227 * (with PDL_POLICY_ROUND) seems better.
2229 ptlrpcd_add_req(req, pol, -1);
2235 cfs_memory_pressure_restore(mpflag);
2237 if (crattr != NULL) {
2238 capa_put(crattr->cra_capa);
2239 OBD_FREE(crattr, sizeof(*crattr));
2243 LASSERT(req == NULL);
2248 OBD_FREE(pga, sizeof(*pga) * page_count);
2249 /* this should happen rarely and is pretty bad, it makes the
2250 * pending list not follow the dirty order */
2251 while (!list_empty(ext_list)) {
2252 ext = list_entry(ext_list->next, struct osc_extent,
2254 list_del_init(&ext->oe_link);
2255 osc_extent_finish(env, ext, 0, rc);
2257 if (clerq && !IS_ERR(clerq))
2258 cl_req_completion(env, clerq, rc);
2263 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2264 struct ldlm_enqueue_info *einfo)
2266 void *data = einfo->ei_cbdata;
2269 LASSERT(lock != NULL);
2270 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2271 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2272 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2273 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2275 lock_res_and_lock(lock);
2276 spin_lock(&osc_ast_guard);
2278 if (lock->l_ast_data == NULL)
2279 lock->l_ast_data = data;
2280 if (lock->l_ast_data == data)
2283 spin_unlock(&osc_ast_guard);
2284 unlock_res_and_lock(lock);
2289 static int osc_set_data_with_check(struct lustre_handle *lockh,
2290 struct ldlm_enqueue_info *einfo)
2292 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2296 set = osc_set_lock_data_with_check(lock, einfo);
2297 LDLM_LOCK_PUT(lock);
2299 CERROR("lockh %p, data %p - client evicted?\n",
2300 lockh, einfo->ei_cbdata);
2304 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2305 ldlm_iterator_t replace, void *data)
2307 struct ldlm_res_id res_id;
2308 struct obd_device *obd = class_exp2obd(exp);
2310 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2311 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2315 /* find any ldlm lock of the inode in osc
2319 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2320 ldlm_iterator_t replace, void *data)
2322 struct ldlm_res_id res_id;
2323 struct obd_device *obd = class_exp2obd(exp);
2326 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2327 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2328 if (rc == LDLM_ITER_STOP)
2330 if (rc == LDLM_ITER_CONTINUE)
2335 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2336 obd_enqueue_update_f upcall, void *cookie,
2337 __u64 *flags, int agl, int rc)
2339 int intent = *flags & LDLM_FL_HAS_INTENT;
2343 /* The request was created before ldlm_cli_enqueue call. */
2344 if (rc == ELDLM_LOCK_ABORTED) {
2345 struct ldlm_reply *rep;
2346 rep = req_capsule_server_get(&req->rq_pill,
2349 LASSERT(rep != NULL);
2350 if (rep->lock_policy_res1)
2351 rc = rep->lock_policy_res1;
2355 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2357 *flags |= LDLM_FL_LVB_READY;
2358 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2359 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2362 /* Call the update callback. */
2363 rc = (*upcall)(cookie, rc);
2367 static int osc_enqueue_interpret(const struct lu_env *env,
2368 struct ptlrpc_request *req,
2369 struct osc_enqueue_args *aa, int rc)
2371 struct ldlm_lock *lock;
2372 struct lustre_handle handle;
2374 struct ost_lvb *lvb;
2376 __u64 *flags = aa->oa_flags;
2378 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2379 * might be freed anytime after lock upcall has been called. */
2380 lustre_handle_copy(&handle, aa->oa_lockh);
2381 mode = aa->oa_ei->ei_mode;
2383 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2385 lock = ldlm_handle2lock(&handle);
2387 /* Take an additional reference so that a blocking AST that
2388 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2389 * to arrive after an upcall has been executed by
2390 * osc_enqueue_fini(). */
2391 ldlm_lock_addref(&handle, mode);
2393 /* Let CP AST to grant the lock first. */
2394 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2396 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2401 lvb_len = sizeof(*aa->oa_lvb);
2404 /* Complete obtaining the lock procedure. */
2405 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2406 mode, flags, lvb, lvb_len, &handle, rc);
2407 /* Complete osc stuff. */
2408 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2409 flags, aa->oa_agl, rc);
2411 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2413 /* Release the lock for async request. */
2414 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2416 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2417 * not already released by
2418 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2420 ldlm_lock_decref(&handle, mode);
2422 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2423 aa->oa_lockh, req, aa);
2424 ldlm_lock_decref(&handle, mode);
2425 LDLM_LOCK_PUT(lock);
2429 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2430 struct lov_oinfo *loi, int flags,
2431 struct ost_lvb *lvb, __u32 mode, int rc)
2433 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2435 if (rc == ELDLM_OK) {
2438 LASSERT(lock != NULL);
2439 loi->loi_lvb = *lvb;
2440 tmp = loi->loi_lvb.lvb_size;
2441 /* Extend KMS up to the end of this lock and no further
2442 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2443 if (tmp > lock->l_policy_data.l_extent.end)
2444 tmp = lock->l_policy_data.l_extent.end + 1;
2445 if (tmp >= loi->loi_kms) {
2446 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2447 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2448 loi_kms_set(loi, tmp);
2450 LDLM_DEBUG(lock, "lock acquired, setting rss="
2451 LPU64"; leaving kms="LPU64", end="LPU64,
2452 loi->loi_lvb.lvb_size, loi->loi_kms,
2453 lock->l_policy_data.l_extent.end);
2455 ldlm_lock_allow_match(lock);
2456 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2457 LASSERT(lock != NULL);
2458 loi->loi_lvb = *lvb;
2459 ldlm_lock_allow_match(lock);
2460 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2461 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2467 ldlm_lock_fail_match(lock);
2469 LDLM_LOCK_PUT(lock);
2472 EXPORT_SYMBOL(osc_update_enqueue);
2474 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2476 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2477 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2478 * other synchronous requests, however keeping some locks and trying to obtain
2479 * others may take a considerable amount of time in a case of ost failure; and
2480 * when other sync requests do not get released lock from a client, the client
2481 * is excluded from the cluster -- such scenarious make the life difficult, so
2482 * release locks just after they are obtained. */
2483 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2484 __u64 *flags, ldlm_policy_data_t *policy,
2485 struct ost_lvb *lvb, int kms_valid,
2486 obd_enqueue_update_f upcall, void *cookie,
2487 struct ldlm_enqueue_info *einfo,
2488 struct lustre_handle *lockh,
2489 struct ptlrpc_request_set *rqset, int async, int agl)
2491 struct obd_device *obd = exp->exp_obd;
2492 struct ptlrpc_request *req = NULL;
2493 int intent = *flags & LDLM_FL_HAS_INTENT;
2494 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2499 /* Filesystem lock extents are extended to page boundaries so that
2500 * dealing with the page cache is a little smoother. */
2501 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2502 policy->l_extent.end |= ~CFS_PAGE_MASK;
2505 * kms is not valid when either object is completely fresh (so that no
2506 * locks are cached), or object was evicted. In the latter case cached
2507 * lock cannot be used, because it would prime inode state with
2508 * potentially stale LVB.
2513 /* Next, search for already existing extent locks that will cover us */
2514 /* If we're trying to read, we also search for an existing PW lock. The
2515 * VFS and page cache already protect us locally, so lots of readers/
2516 * writers can share a single PW lock.
2518 * There are problems with conversion deadlocks, so instead of
2519 * converting a read lock to a write lock, we'll just enqueue a new
2522 * At some point we should cancel the read lock instead of making them
2523 * send us a blocking callback, but there are problems with canceling
2524 * locks out from other users right now, too. */
2525 mode = einfo->ei_mode;
2526 if (einfo->ei_mode == LCK_PR)
2528 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2529 einfo->ei_type, policy, mode, lockh, 0);
2531 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2533 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2534 /* For AGL, if enqueue RPC is sent but the lock is not
2535 * granted, then skip to process this strpe.
2536 * Return -ECANCELED to tell the caller. */
2537 ldlm_lock_decref(lockh, mode);
2538 LDLM_LOCK_PUT(matched);
2540 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2541 *flags |= LDLM_FL_LVB_READY;
2542 /* addref the lock only if not async requests and PW
2543 * lock is matched whereas we asked for PR. */
2544 if (!rqset && einfo->ei_mode != mode)
2545 ldlm_lock_addref(lockh, LCK_PR);
2547 /* I would like to be able to ASSERT here that
2548 * rss <= kms, but I can't, for reasons which
2549 * are explained in lov_enqueue() */
2552 /* We already have a lock, and it's referenced.
2554 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2555 * AGL upcall may change it to CLS_HELD directly. */
2556 (*upcall)(cookie, ELDLM_OK);
2558 if (einfo->ei_mode != mode)
2559 ldlm_lock_decref(lockh, LCK_PW);
2561 /* For async requests, decref the lock. */
2562 ldlm_lock_decref(lockh, einfo->ei_mode);
2563 LDLM_LOCK_PUT(matched);
2566 ldlm_lock_decref(lockh, mode);
2567 LDLM_LOCK_PUT(matched);
2574 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2575 &RQF_LDLM_ENQUEUE_LVB);
2579 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2581 ptlrpc_request_free(req);
2585 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2587 ptlrpc_request_set_replen(req);
2590 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2591 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2593 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2594 sizeof(*lvb), LVB_T_OST, lockh, async);
2597 struct osc_enqueue_args *aa;
2598 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2599 aa = ptlrpc_req_async_args(req);
2602 aa->oa_flags = flags;
2603 aa->oa_upcall = upcall;
2604 aa->oa_cookie = cookie;
2606 aa->oa_lockh = lockh;
2609 req->rq_interpret_reply =
2610 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2611 if (rqset == PTLRPCD_SET)
2612 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2614 ptlrpc_set_add_req(rqset, req);
2615 } else if (intent) {
2616 ptlrpc_req_finished(req);
2621 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2623 ptlrpc_req_finished(req);
2628 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2629 struct ldlm_enqueue_info *einfo,
2630 struct ptlrpc_request_set *rqset)
2632 struct ldlm_res_id res_id;
2636 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2637 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2638 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2639 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2640 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2641 rqset, rqset != NULL, 0);
2645 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2646 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2647 int *flags, void *data, struct lustre_handle *lockh,
2650 struct obd_device *obd = exp->exp_obd;
2651 int lflags = *flags;
2655 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2658 /* Filesystem lock extents are extended to page boundaries so that
2659 * dealing with the page cache is a little smoother */
2660 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2661 policy->l_extent.end |= ~CFS_PAGE_MASK;
2663 /* Next, search for already existing extent locks that will cover us */
2664 /* If we're trying to read, we also search for an existing PW lock. The
2665 * VFS and page cache already protect us locally, so lots of readers/
2666 * writers can share a single PW lock. */
2670 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2671 res_id, type, policy, rc, lockh, unref);
2674 if (!osc_set_data_with_check(lockh, data)) {
2675 if (!(lflags & LDLM_FL_TEST_LOCK))
2676 ldlm_lock_decref(lockh, rc);
2680 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2681 ldlm_lock_addref(lockh, LCK_PR);
2682 ldlm_lock_decref(lockh, LCK_PW);
2689 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2693 if (unlikely(mode == LCK_GROUP))
2694 ldlm_lock_decref_and_cancel(lockh, mode);
2696 ldlm_lock_decref(lockh, mode);
2701 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2702 __u32 mode, struct lustre_handle *lockh)
2705 RETURN(osc_cancel_base(lockh, mode));
2708 static int osc_cancel_unused(struct obd_export *exp,
2709 struct lov_stripe_md *lsm,
2710 ldlm_cancel_flags_t flags,
2713 struct obd_device *obd = class_exp2obd(exp);
2714 struct ldlm_res_id res_id, *resp = NULL;
2717 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2721 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2724 static int osc_statfs_interpret(const struct lu_env *env,
2725 struct ptlrpc_request *req,
2726 struct osc_async_args *aa, int rc)
2728 struct obd_statfs *msfs;
2732 /* The request has in fact never been sent
2733 * due to issues at a higher level (LOV).
2734 * Exit immediately since the caller is
2735 * aware of the problem and takes care
2736 * of the clean up */
2739 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2740 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2746 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2748 GOTO(out, rc = -EPROTO);
2751 *aa->aa_oi->oi_osfs = *msfs;
2753 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2757 static int osc_statfs_async(struct obd_export *exp,
2758 struct obd_info *oinfo, __u64 max_age,
2759 struct ptlrpc_request_set *rqset)
2761 struct obd_device *obd = class_exp2obd(exp);
2762 struct ptlrpc_request *req;
2763 struct osc_async_args *aa;
2767 /* We could possibly pass max_age in the request (as an absolute
2768 * timestamp or a "seconds.usec ago") so the target can avoid doing
2769 * extra calls into the filesystem if that isn't necessary (e.g.
2770 * during mount that would help a bit). Having relative timestamps
2771 * is not so great if request processing is slow, while absolute
2772 * timestamps are not ideal because they need time synchronization. */
2773 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2777 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2779 ptlrpc_request_free(req);
2782 ptlrpc_request_set_replen(req);
2783 req->rq_request_portal = OST_CREATE_PORTAL;
2784 ptlrpc_at_set_req_timeout(req);
2786 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2787 /* procfs requests not want stat in wait for avoid deadlock */
2788 req->rq_no_resend = 1;
2789 req->rq_no_delay = 1;
2792 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2793 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2794 aa = ptlrpc_req_async_args(req);
2797 ptlrpc_set_add_req(rqset, req);
2801 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2802 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2804 struct obd_device *obd = class_exp2obd(exp);
2805 struct obd_statfs *msfs;
2806 struct ptlrpc_request *req;
2807 struct obd_import *imp = NULL;
2811 /*Since the request might also come from lprocfs, so we need
2812 *sync this with client_disconnect_export Bug15684*/
2813 down_read(&obd->u.cli.cl_sem);
2814 if (obd->u.cli.cl_import)
2815 imp = class_import_get(obd->u.cli.cl_import);
2816 up_read(&obd->u.cli.cl_sem);
2820 /* We could possibly pass max_age in the request (as an absolute
2821 * timestamp or a "seconds.usec ago") so the target can avoid doing
2822 * extra calls into the filesystem if that isn't necessary (e.g.
2823 * during mount that would help a bit). Having relative timestamps
2824 * is not so great if request processing is slow, while absolute
2825 * timestamps are not ideal because they need time synchronization. */
2826 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2828 class_import_put(imp);
2833 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2835 ptlrpc_request_free(req);
2838 ptlrpc_request_set_replen(req);
2839 req->rq_request_portal = OST_CREATE_PORTAL;
2840 ptlrpc_at_set_req_timeout(req);
2842 if (flags & OBD_STATFS_NODELAY) {
2843 /* procfs requests not want stat in wait for avoid deadlock */
2844 req->rq_no_resend = 1;
2845 req->rq_no_delay = 1;
2848 rc = ptlrpc_queue_wait(req);
2852 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2854 GOTO(out, rc = -EPROTO);
2861 ptlrpc_req_finished(req);
2865 /* Retrieve object striping information.
2867 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2868 * the maximum number of OST indices which will fit in the user buffer.
2869 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2871 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2873 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2874 struct lov_user_md_v3 lum, *lumk;
2875 struct lov_user_ost_data_v1 *lmm_objects;
2876 int rc = 0, lum_size;
2882 /* we only need the header part from user space to get lmm_magic and
2883 * lmm_stripe_count, (the header part is common to v1 and v3) */
2884 lum_size = sizeof(struct lov_user_md_v1);
2885 if (copy_from_user(&lum, lump, lum_size))
2888 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2889 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2892 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2893 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2894 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2895 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2897 /* we can use lov_mds_md_size() to compute lum_size
2898 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2899 if (lum.lmm_stripe_count > 0) {
2900 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2901 OBD_ALLOC(lumk, lum_size);
2905 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2907 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2909 lmm_objects = &(lumk->lmm_objects[0]);
2910 lmm_objects->l_ost_oi = lsm->lsm_oi;
2912 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2916 lumk->lmm_oi = lsm->lsm_oi;
2917 lumk->lmm_stripe_count = 1;
2919 if (copy_to_user(lump, lumk, lum_size))
2923 OBD_FREE(lumk, lum_size);
2929 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2930 void *karg, void *uarg)
2932 struct obd_device *obd = exp->exp_obd;
2933 struct obd_ioctl_data *data = karg;
2937 if (!try_module_get(THIS_MODULE)) {
2938 CERROR("Can't get module. Is it alive?");
2942 case OBD_IOC_LOV_GET_CONFIG: {
2944 struct lov_desc *desc;
2945 struct obd_uuid uuid;
2949 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2950 GOTO(out, err = -EINVAL);
2952 data = (struct obd_ioctl_data *)buf;
2954 if (sizeof(*desc) > data->ioc_inllen1) {
2955 obd_ioctl_freedata(buf, len);
2956 GOTO(out, err = -EINVAL);
2959 if (data->ioc_inllen2 < sizeof(uuid)) {
2960 obd_ioctl_freedata(buf, len);
2961 GOTO(out, err = -EINVAL);
2964 desc = (struct lov_desc *)data->ioc_inlbuf1;
2965 desc->ld_tgt_count = 1;
2966 desc->ld_active_tgt_count = 1;
2967 desc->ld_default_stripe_count = 1;
2968 desc->ld_default_stripe_size = 0;
2969 desc->ld_default_stripe_offset = 0;
2970 desc->ld_pattern = 0;
2971 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2973 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2975 err = copy_to_user((void *)uarg, buf, len);
2978 obd_ioctl_freedata(buf, len);
2981 case LL_IOC_LOV_SETSTRIPE:
2982 err = obd_alloc_memmd(exp, karg);
2986 case LL_IOC_LOV_GETSTRIPE:
2987 err = osc_getstripe(karg, uarg);
2989 case OBD_IOC_CLIENT_RECOVER:
2990 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2991 data->ioc_inlbuf1, 0);
2995 case IOC_OSC_SET_ACTIVE:
2996 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2999 case OBD_IOC_POLL_QUOTACHECK:
3000 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3002 case OBD_IOC_PING_TARGET:
3003 err = ptlrpc_obd_ping(obd);
3006 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3007 cmd, current_comm());
3008 GOTO(out, err = -ENOTTY);
3011 module_put(THIS_MODULE);
3015 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3016 obd_count keylen, void *key, __u32 *vallen, void *val,
3017 struct lov_stripe_md *lsm)
3020 if (!vallen || !val)
3023 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3024 __u32 *stripe = val;
3025 *vallen = sizeof(*stripe);
3028 } else if (KEY_IS(KEY_LAST_ID)) {
3029 struct ptlrpc_request *req;
3034 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3035 &RQF_OST_GET_INFO_LAST_ID);
3039 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3040 RCL_CLIENT, keylen);
3041 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3043 ptlrpc_request_free(req);
3047 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3048 memcpy(tmp, key, keylen);
3050 req->rq_no_delay = req->rq_no_resend = 1;
3051 ptlrpc_request_set_replen(req);
3052 rc = ptlrpc_queue_wait(req);
3056 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3058 GOTO(out, rc = -EPROTO);
3060 *((obd_id *)val) = *reply;
3062 ptlrpc_req_finished(req);
3064 } else if (KEY_IS(KEY_FIEMAP)) {
3065 struct ll_fiemap_info_key *fm_key =
3066 (struct ll_fiemap_info_key *)key;
3067 struct ldlm_res_id res_id;
3068 ldlm_policy_data_t policy;
3069 struct lustre_handle lockh;
3070 ldlm_mode_t mode = 0;
3071 struct ptlrpc_request *req;
3072 struct ll_user_fiemap *reply;
3076 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3079 policy.l_extent.start = fm_key->fiemap.fm_start &
3082 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3083 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3084 policy.l_extent.end = OBD_OBJECT_EOF;
3086 policy.l_extent.end = (fm_key->fiemap.fm_start +
3087 fm_key->fiemap.fm_length +
3088 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3090 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3091 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3092 LDLM_FL_BLOCK_GRANTED |
3094 &res_id, LDLM_EXTENT, &policy,
3095 LCK_PR | LCK_PW, &lockh, 0);
3096 if (mode) { /* lock is cached on client */
3097 if (mode != LCK_PR) {
3098 ldlm_lock_addref(&lockh, LCK_PR);
3099 ldlm_lock_decref(&lockh, LCK_PW);
3101 } else { /* no cached lock, needs acquire lock on server side */
3102 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3103 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3107 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3108 &RQF_OST_GET_INFO_FIEMAP);
3110 GOTO(drop_lock, rc = -ENOMEM);
3112 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3113 RCL_CLIENT, keylen);
3114 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3115 RCL_CLIENT, *vallen);
3116 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3117 RCL_SERVER, *vallen);
3119 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3121 ptlrpc_request_free(req);
3122 GOTO(drop_lock, rc);
3125 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3126 memcpy(tmp, key, keylen);
3127 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3128 memcpy(tmp, val, *vallen);
3130 ptlrpc_request_set_replen(req);
3131 rc = ptlrpc_queue_wait(req);
3135 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3137 GOTO(fini_req, rc = -EPROTO);
3139 memcpy(val, reply, *vallen);
3141 ptlrpc_req_finished(req);
3144 ldlm_lock_decref(&lockh, LCK_PR);
3151 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3152 obd_count keylen, void *key, obd_count vallen,
3153 void *val, struct ptlrpc_request_set *set)
3155 struct ptlrpc_request *req;
3156 struct obd_device *obd = exp->exp_obd;
3157 struct obd_import *imp = class_exp2cliimp(exp);
3162 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3164 if (KEY_IS(KEY_CHECKSUM)) {
3165 if (vallen != sizeof(int))
3167 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3171 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3172 sptlrpc_conf_client_adapt(obd);
3176 if (KEY_IS(KEY_FLUSH_CTX)) {
3177 sptlrpc_import_flush_my_ctx(imp);
3181 if (KEY_IS(KEY_CACHE_SET)) {
3182 struct client_obd *cli = &obd->u.cli;
3184 LASSERT(cli->cl_cache == NULL); /* only once */
3185 cli->cl_cache = (struct cl_client_cache *)val;
3186 atomic_inc(&cli->cl_cache->ccc_users);
3187 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3189 /* add this osc into entity list */
3190 LASSERT(list_empty(&cli->cl_lru_osc));
3191 spin_lock(&cli->cl_cache->ccc_lru_lock);
3192 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3193 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3198 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3199 struct client_obd *cli = &obd->u.cli;
3200 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3201 int target = *(int *)val;
3203 nr = osc_lru_shrink(cli, min(nr, target));
3208 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3211 /* We pass all other commands directly to OST. Since nobody calls osc
3212 methods directly and everybody is supposed to go through LOV, we
3213 assume lov checked invalid values for us.
3214 The only recognised values so far are evict_by_nid and mds_conn.
3215 Even if something bad goes through, we'd get a -EINVAL from OST
3218 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3219 &RQF_OST_SET_GRANT_INFO :
3224 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3225 RCL_CLIENT, keylen);
3226 if (!KEY_IS(KEY_GRANT_SHRINK))
3227 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3228 RCL_CLIENT, vallen);
3229 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3231 ptlrpc_request_free(req);
3235 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3236 memcpy(tmp, key, keylen);
3237 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3240 memcpy(tmp, val, vallen);
3242 if (KEY_IS(KEY_GRANT_SHRINK)) {
3243 struct osc_grant_args *aa;
3246 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3247 aa = ptlrpc_req_async_args(req);
3250 ptlrpc_req_finished(req);
3253 *oa = ((struct ost_body *)val)->oa;
3255 req->rq_interpret_reply = osc_shrink_grant_interpret;
3258 ptlrpc_request_set_replen(req);
3259 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3260 LASSERT(set != NULL);
3261 ptlrpc_set_add_req(set, req);
3262 ptlrpc_check_set(NULL, set);
3264 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3270 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3271 struct obd_device *disk_obd, int *index)
3273 /* this code is not supposed to be used with LOD/OSP
3274 * to be removed soon */
3279 static int osc_llog_finish(struct obd_device *obd, int count)
3281 struct llog_ctxt *ctxt;
3285 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3287 llog_cat_close(NULL, ctxt->loc_handle);
3288 llog_cleanup(NULL, ctxt);
3291 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3293 llog_cleanup(NULL, ctxt);
3297 static int osc_reconnect(const struct lu_env *env,
3298 struct obd_export *exp, struct obd_device *obd,
3299 struct obd_uuid *cluuid,
3300 struct obd_connect_data *data,
3303 struct client_obd *cli = &obd->u.cli;
3305 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3308 client_obd_list_lock(&cli->cl_loi_list_lock);
3309 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3310 2 * cli_brw_size(obd);
3311 lost_grant = cli->cl_lost_grant;
3312 cli->cl_lost_grant = 0;
3313 client_obd_list_unlock(&cli->cl_loi_list_lock);
3315 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3316 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3317 data->ocd_version, data->ocd_grant, lost_grant);
3323 static int osc_disconnect(struct obd_export *exp)
3325 struct obd_device *obd = class_exp2obd(exp);
3326 struct llog_ctxt *ctxt;
3329 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3331 if (obd->u.cli.cl_conn_count == 1) {
3332 /* Flush any remaining cancel messages out to the
3334 llog_sync(ctxt, exp, 0);
3336 llog_ctxt_put(ctxt);
3338 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3342 rc = client_disconnect_export(exp);
3344 * Initially we put del_shrink_grant before disconnect_export, but it
3345 * causes the following problem if setup (connect) and cleanup
3346 * (disconnect) are tangled together.
3347 * connect p1 disconnect p2
3348 * ptlrpc_connect_import
3349 * ............... class_manual_cleanup
3352 * ptlrpc_connect_interrupt
3354 * add this client to shrink list
3356 * Bang! pinger trigger the shrink.
3357 * So the osc should be disconnected from the shrink list, after we
3358 * are sure the import has been destroyed. BUG18662
3360 if (obd->u.cli.cl_import == NULL)
3361 osc_del_shrink_grant(&obd->u.cli);
3365 static int osc_import_event(struct obd_device *obd,
3366 struct obd_import *imp,
3367 enum obd_import_event event)
3369 struct client_obd *cli;
3373 LASSERT(imp->imp_obd == obd);
3376 case IMP_EVENT_DISCON: {
3378 client_obd_list_lock(&cli->cl_loi_list_lock);
3379 cli->cl_avail_grant = 0;
3380 cli->cl_lost_grant = 0;
3381 client_obd_list_unlock(&cli->cl_loi_list_lock);
3384 case IMP_EVENT_INACTIVE: {
3385 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3388 case IMP_EVENT_INVALIDATE: {
3389 struct ldlm_namespace *ns = obd->obd_namespace;
3393 env = cl_env_get(&refcheck);
3397 /* all pages go to failing rpcs due to the invalid
3399 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3401 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3402 cl_env_put(env, &refcheck);
3407 case IMP_EVENT_ACTIVE: {
3408 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3411 case IMP_EVENT_OCD: {
3412 struct obd_connect_data *ocd = &imp->imp_connect_data;
3414 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3415 osc_init_grant(&obd->u.cli, ocd);
3418 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3419 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3421 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3424 case IMP_EVENT_DEACTIVATE: {
3425 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3428 case IMP_EVENT_ACTIVATE: {
3429 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3433 CERROR("Unknown import event %d\n", event);
3440 * Determine whether the lock can be canceled before replaying the lock
3441 * during recovery, see bug16774 for detailed information.
3443 * \retval zero the lock can't be canceled
3444 * \retval other ok to cancel
3446 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3448 check_res_locked(lock->l_resource);
3451 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3453 * XXX as a future improvement, we can also cancel unused write lock
3454 * if it doesn't have dirty data and active mmaps.
3456 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3457 (lock->l_granted_mode == LCK_PR ||
3458 lock->l_granted_mode == LCK_CR) &&
3459 (osc_dlm_lock_pageref(lock) == 0))
3465 static int brw_queue_work(const struct lu_env *env, void *data)
3467 struct client_obd *cli = data;
3469 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3471 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3475 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3477 struct lprocfs_static_vars lvars = { 0 };
3478 struct client_obd *cli = &obd->u.cli;
3483 rc = ptlrpcd_addref();
3487 rc = client_obd_setup(obd, lcfg);
3489 GOTO(out_ptlrpcd, rc);
3491 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3492 if (IS_ERR(handler))
3493 GOTO(out_client_setup, rc = PTR_ERR(handler));
3494 cli->cl_writeback_work = handler;
3496 rc = osc_quota_setup(obd);
3498 GOTO(out_ptlrpcd_work, rc);
3500 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3501 lprocfs_osc_init_vars(&lvars);
3502 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3503 lproc_osc_attach_seqstat(obd);
3504 sptlrpc_lprocfs_cliobd_attach(obd);
3505 ptlrpc_lprocfs_register_obd(obd);
3508 /* We need to allocate a few requests more, because
3509 * brw_interpret tries to create new requests before freeing
3510 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3511 * reserved, but I'm afraid that might be too much wasted RAM
3512 * in fact, so 2 is just my guess and still should work. */
3513 cli->cl_import->imp_rq_pool =
3514 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3516 ptlrpc_add_rqs_to_pool);
3518 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3519 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3523 ptlrpcd_destroy_work(handler);
3525 client_obd_cleanup(obd);
3531 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3537 case OBD_CLEANUP_EARLY: {
3538 struct obd_import *imp;
3539 imp = obd->u.cli.cl_import;
3540 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3541 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3542 ptlrpc_deactivate_import(imp);
3543 spin_lock(&imp->imp_lock);
3544 imp->imp_pingable = 0;
3545 spin_unlock(&imp->imp_lock);
3548 case OBD_CLEANUP_EXPORTS: {
3549 struct client_obd *cli = &obd->u.cli;
3551 * for echo client, export may be on zombie list, wait for
3552 * zombie thread to cull it, because cli.cl_import will be
3553 * cleared in client_disconnect_export():
3554 * class_export_destroy() -> obd_cleanup() ->
3555 * echo_device_free() -> echo_client_cleanup() ->
3556 * obd_disconnect() -> osc_disconnect() ->
3557 * client_disconnect_export()
3559 obd_zombie_barrier();
3560 if (cli->cl_writeback_work) {
3561 ptlrpcd_destroy_work(cli->cl_writeback_work);
3562 cli->cl_writeback_work = NULL;
3564 obd_cleanup_client_import(obd);
3565 ptlrpc_lprocfs_unregister_obd(obd);
3566 lprocfs_obd_cleanup(obd);
3567 rc = obd_llog_finish(obd, 0);
3569 CERROR("failed to cleanup llogging subsystems\n");
3576 int osc_cleanup(struct obd_device *obd)
3578 struct client_obd *cli = &obd->u.cli;
3584 if (cli->cl_cache != NULL) {
3585 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3586 spin_lock(&cli->cl_cache->ccc_lru_lock);
3587 list_del_init(&cli->cl_lru_osc);
3588 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3589 cli->cl_lru_left = NULL;
3590 atomic_dec(&cli->cl_cache->ccc_users);
3591 cli->cl_cache = NULL;
3594 /* free memory of osc quota cache */
3595 osc_quota_cleanup(obd);
3597 rc = client_obd_cleanup(obd);
3603 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3605 struct lprocfs_static_vars lvars = { 0 };
3608 lprocfs_osc_init_vars(&lvars);
3610 switch (lcfg->lcfg_command) {
3612 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3622 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3624 return osc_process_config_base(obd, buf);
3627 struct obd_ops osc_obd_ops = {
3628 .o_owner = THIS_MODULE,
3629 .o_setup = osc_setup,
3630 .o_precleanup = osc_precleanup,
3631 .o_cleanup = osc_cleanup,
3632 .o_add_conn = client_import_add_conn,
3633 .o_del_conn = client_import_del_conn,
3634 .o_connect = client_connect_import,
3635 .o_reconnect = osc_reconnect,
3636 .o_disconnect = osc_disconnect,
3637 .o_statfs = osc_statfs,
3638 .o_statfs_async = osc_statfs_async,
3639 .o_packmd = osc_packmd,
3640 .o_unpackmd = osc_unpackmd,
3641 .o_create = osc_create,
3642 .o_destroy = osc_destroy,
3643 .o_getattr = osc_getattr,
3644 .o_getattr_async = osc_getattr_async,
3645 .o_setattr = osc_setattr,
3646 .o_setattr_async = osc_setattr_async,
3648 .o_punch = osc_punch,
3650 .o_enqueue = osc_enqueue,
3651 .o_change_cbdata = osc_change_cbdata,
3652 .o_find_cbdata = osc_find_cbdata,
3653 .o_cancel = osc_cancel,
3654 .o_cancel_unused = osc_cancel_unused,
3655 .o_iocontrol = osc_iocontrol,
3656 .o_get_info = osc_get_info,
3657 .o_set_info_async = osc_set_info_async,
3658 .o_import_event = osc_import_event,
3659 .o_llog_init = osc_llog_init,
3660 .o_llog_finish = osc_llog_finish,
3661 .o_process_config = osc_process_config,
3662 .o_quotactl = osc_quotactl,
3663 .o_quotacheck = osc_quotacheck,
3666 extern struct lu_kmem_descr osc_caches[];
3667 extern spinlock_t osc_ast_guard;
3668 extern struct lock_class_key osc_ast_guard_class;
3670 int __init osc_init(void)
3672 struct lprocfs_static_vars lvars = { 0 };
3676 /* print an address of _any_ initialized kernel symbol from this
3677 * module, to allow debugging with gdb that doesn't support data
3678 * symbols from modules.*/
3679 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3681 rc = lu_kmem_init(osc_caches);
3683 lprocfs_osc_init_vars(&lvars);
3685 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3686 LUSTRE_OSC_NAME, &osc_device_type);
3688 lu_kmem_fini(osc_caches);
3692 spin_lock_init(&osc_ast_guard);
3693 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3698 static void /*__exit*/ osc_exit(void)
3700 class_unregister_type(LUSTRE_OSC_NAME);
3701 lu_kmem_fini(osc_caches);
3704 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3705 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3706 MODULE_LICENSE("GPL");
3708 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);